You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/07/03 15:17:32 UTC

[1/3] flink git commit: [FLINK-9444] [formats] Add full SQL support for Avro formats

Repository: flink
Updated Branches:
  refs/heads/master 19040a632 -> c34c7e412


http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
index 88c70c6..e0a66d0 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
@@ -18,14 +18,15 @@
 
 package org.apache.flink.table.runtime.batch;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.formats.avro.generated.Address;
 import org.apache.flink.formats.avro.generated.Colors;
 import org.apache.flink.formats.avro.generated.Fixed16;
+import org.apache.flink.formats.avro.generated.Fixed2;
 import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
@@ -34,15 +35,22 @@ import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.Row;
 
 import org.apache.avro.util.Utf8;
+import org.joda.time.DateTime;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
 
 /**
  * Tests for interoperability with Avro types.
@@ -72,6 +80,14 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase {
 							.setCity("Berlin")
 							.setState("Berlin")
 							.setZip("12049").build())
+			.setTypeBytes(ByteBuffer.allocate(10))
+			.setTypeDate(LocalDate.parse("2014-03-01"))
+			.setTypeTimeMillis(LocalTime.parse("12:12:12"))
+			.setTypeTimeMicros(123456)
+			.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"))
+			.setTypeTimestampMicros(123456L)
+			.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
+			.setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
 			.build();
 
 	private static final User USER_2 = User.newBuilder()
@@ -88,7 +104,14 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase {
 			.setTypeMap(new HashMap<>())
 			.setTypeFixed(new Fixed16())
 			.setTypeUnion(null)
-			.setTypeNested(null)
+			.setTypeNested(null).setTypeDate(LocalDate.parse("2014-03-01"))
+			.setTypeBytes(ByteBuffer.allocate(10))
+			.setTypeTimeMillis(LocalTime.parse("12:12:12"))
+			.setTypeTimeMicros(123456)
+			.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"))
+			.setTypeTimestampMicros(123456L)
+			.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
+			.setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
 			.build();
 
 	private static final User USER_3 = User.newBuilder()
@@ -106,26 +129,16 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase {
 			.setTypeFixed(new Fixed16())
 			.setTypeUnion(null)
 			.setTypeNested(null)
+			.setTypeBytes(ByteBuffer.allocate(10))
+			.setTypeDate(LocalDate.parse("2014-03-01"))
+			.setTypeTimeMillis(LocalTime.parse("12:12:12"))
+			.setTypeTimeMicros(123456)
+			.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"))
+			.setTypeTimestampMicros(123456L)
+			.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
+			.setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
 			.build();
 
-	private static TypeInformation<Row> rowType = Types.ROW(
-			Types.GENERIC(Utf8.class),
-			Types.INT,
-			Types.GENERIC(Utf8.class),
-			Types.GENERIC(List.class),
-			Types.GENERIC(List.class),
-			Types.GENERIC(Object.class),
-			Types.DOUBLE,
-			Types.ENUM(Colors.class),
-			Types.GENERIC(Fixed16.class),
-			Types.LONG,
-			Types.GENERIC(Map.class),
-			Types.POJO(Address.class),
-			Types.GENERIC(Object.class),
-			Types.GENERIC(List.class),
-			Types.GENERIC(Object.class)
-	);
-
 	public AvroTypesITCase(
 			TestExecutionMode executionMode,
 			TableConfigMode tableConfigMode) {
@@ -135,19 +148,29 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase {
 	@Test
 	public void testAvroToRow() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().registerTypeWithKryoSerializer(LocalDate.class, AvroKryoSerializerUtils.JodaLocalDateSerializer.class);
+		env.getConfig().registerTypeWithKryoSerializer(LocalTime.class, AvroKryoSerializerUtils.JodaLocalTimeSerializer.class);
 		BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env, config());
 
 		Table t = tEnv.fromDataSet(testData(env));
 		Table result = t.select("*");
 
-		List<Row> results = tEnv.toDataSet(result, rowType).collect();
-		String expected = "black,null,Whatever,[true],[hello],true,0.0,GREEN," +
-				"[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],42,{},null,null,null,null\n" +
-				"blue,null,Charlie,[],[],false,1.337,RED," +
-				"null,1337,{},{\"num\": 42, \"street\": \"Bakerstreet\", \"city\": \"Berlin\", " +
-				"\"state\": \"Berlin\", \"zip\": \"12049\"},null,null,null\n" +
-				"yellow,null,Terminator,[false],[world],false,0.0,GREEN," +
-				"[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],1,{},null,null,null,null";
+		List<Row> results = tEnv.toDataSet(result, Row.class).collect();
+		String expected =
+			"black,null,Whatever,[true],[hello],true,java.nio.HeapByteBuffer[pos=0 lim=10 cap=10]," +
+			"2014-03-01,java.nio.HeapByteBuffer[pos=0 lim=2 cap=2],[7, -48],0.0,GREEN," +
+			"[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],42,{},null,null,null,123456," +
+			"12:12:12.000,123456,2014-03-01T12:12:12.321Z,null\n" +
+			"blue,null,Charlie,[],[],false,java.nio.HeapByteBuffer[pos=0 lim=10 cap=10],2014-03-01," +
+			"java.nio.HeapByteBuffer[pos=0 lim=2 cap=2],[7, -48],1.337,RED,null,1337,{}," +
+			"{\"num\": 42, \"street\": \"Bakerstreet\", \"city\": \"Berlin\", \"state\": " +
+			"\"Berlin\", \"zip\": \"12049\"},null,null,123456,12:12:12.000,123456," +
+			"2014-03-01T12:12:12.321Z,null\n" +
+			"yellow,null,Terminator,[false],[world],false," +
+			"java.nio.HeapByteBuffer[pos=0 lim=10 cap=10],2014-03-01," +
+			"java.nio.HeapByteBuffer[pos=0 lim=2 cap=2],[7, -48],0.0,GREEN," +
+			"[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],1,{},null,null,null,123456," +
+			"12:12:12.000,123456,2014-03-01T12:12:12.321Z,null";
 		TestBaseUtils.compareResultAsText(results, expected);
 	}
 
@@ -189,8 +212,8 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase {
 		Table result = t.select("*");
 
 		List<User> results = tEnv.toDataSet(result, Types.POJO(User.class)).collect();
-		String expected = USER_1 + "\n" + USER_2 + "\n" + USER_3;
-		TestBaseUtils.compareResultAsText(results, expected);
+		List<User> expected = Arrays.asList(USER_1, USER_2, USER_3);
+		assertEquals(expected, results);
 	}
 
 	private DataSet<User> testData(ExecutionEnvironment env) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/resources/avro/user.avsc
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/resources/avro/user.avsc b/flink-formats/flink-avro/src/test/resources/avro/user.avsc
index f493d1f..70f8e95 100644
--- a/flink-formats/flink-avro/src/test/resources/avro/user.avsc
+++ b/flink-formats/flink-avro/src/test/resources/avro/user.avsc
@@ -28,9 +28,17 @@
      {"name": "type_map", "type": {"type": "map", "values": "long"}},
      {"name": "type_fixed",
                  "size": 16,
-                 "type": ["null", {"name": "Fixed16", "size": 16, "type": "fixed"}] },
+                 "type": ["null", {"name": "Fixed16", "size": 16, "type": "fixed"}]},
      {"name": "type_union", "type": ["null", "boolean", "long", "double"]},
-     {"name": "type_nested", "type": ["null", "Address"]}
+     {"name": "type_nested", "type": ["null", "Address"]},
+     {"name": "type_bytes", "type": "bytes"},
+     {"name": "type_date", "type": {"type": "int", "logicalType": "date"}},
+     {"name": "type_time_millis", "type": {"type": "int", "logicalType": "time-millis"}},
+     {"name": "type_time_micros", "type": {"type": "int", "logicalType": "time-micros"}},
+     {"name": "type_timestamp_millis", "type": {"type": "long", "logicalType": "timestamp-millis"}},
+     {"name": "type_timestamp_micros", "type": {"type": "long", "logicalType": "timestamp-micros"}},
+     {"name": "type_decimal_bytes", "type": {"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 2}},
+     {"name": "type_decimal_fixed", "type": {"name": "Fixed2", "size": 2, "type": "fixed", "logicalType": "decimal", "precision": 4, "scale": 2}}
  ]
 },
  {"namespace": "org.apache.flink.formats.avro.generated",
@@ -40,4 +48,55 @@
       {"name": "name", "type": "string"},
       {"name": "optionalField",  "type": ["null", "int"], "default": null}
   ]
+},
+/**
+ * The BackwardsCompatibleAvroSerializer does not support custom Kryo
+ * registrations (which logical types require for Avro 1.8 because Kryo does not support Joda-Time).
+ * We introduce a simpler user record for pre-Avro 1.8 test cases. This record can be dropped when
+ * we drop support for 1.3 savepoints.
+ */
+{"namespace": "org.apache.flink.formats.avro.generated",
+ "type": "record",
+ "name": "SimpleUser",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favorite_number",  "type": ["int", "null"]},
+     {"name": "favorite_color", "type": ["string", "null"]},
+     {"name": "type_long_test", "type": ["long", "null"]},
+     {"name": "type_double_test", "type": "double"},
+     {"name": "type_null_test", "type": ["null"]},
+     {"name": "type_bool_test", "type": ["boolean"]},
+     {"name": "type_array_string", "type" : {"type" : "array", "items" : "string"}},
+     {"name": "type_array_boolean", "type" : {"type" : "array", "items" : "boolean"}},
+     {"name": "type_nullable_array", "type": ["null", {"type":"array", "items":"string"}], "default":null},
+     {"name": "type_enum", "type": "Colors"},
+     {"name": "type_map", "type": {"type": "map", "values": "long"}},
+     {"name": "type_fixed", "type": ["null", "Fixed16"]},
+     {"name": "type_union", "type": ["null", "boolean", "long", "double"]},
+     {"name": "type_nested", "type": ["null", "Address"]},
+     {"name": "type_bytes", "type": "bytes"}
+ ]
+},
+ {"namespace": "org.apache.flink.formats.avro.generated",
+  "type": "record",
+  "name": "SchemaRecord",
+  "fields": [
+      {"name": "field1", "type": ["null", "long"], "default": null},
+      {"name": "field2", "type": ["null", "string"], "default": null},
+      {"name": "time1", "type": "long"},
+      {"name": "time2", "type": "long"},
+      {"name": "field3", "type": ["null", "double"], "default": null}
+  ]
+},
+ {"namespace": "org.apache.flink.formats.avro.generated",
+  "type": "record",
+  "name": "DifferentSchemaRecord",
+  "fields": [
+      {"name": "otherField1", "type": ["null", "long"], "default": null},
+      {"name": "otherField2", "type": ["null", "string"], "default": null},
+      {"name": "otherTime1", "type": "long"},
+      {"name": "otherField3", "type": ["null", "double"], "default": null},
+      {"name": "otherField4", "type": ["null", "float"], "default": null},
+      {"name": "otherField5", "type": ["null", "int"], "default": null}
+  ]
 }]

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data b/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data
deleted file mode 100644
index 42eaf5d..0000000
Binary files a/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serializer-snapshot
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serializer-snapshot b/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serializer-snapshot
deleted file mode 100644
index 0599305..0000000
Binary files a/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serializer-snapshot and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serialized-data
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serialized-data b/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serialized-data
new file mode 100644
index 0000000..23853cf
Binary files /dev/null and b/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serialized-data differ

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serializer-snapshot
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serializer-snapshot b/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serializer-snapshot
new file mode 100644
index 0000000..1474300
Binary files /dev/null and b/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serializer-snapshot differ

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
index edc4b01..df52851 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
@@ -87,7 +87,6 @@ public class JsonRowDeserializationSchema implements DeserializationSchema<Row>
 		this(JsonSchemaConverter.convert(jsonSchema));
 	}
 
-	@SuppressWarnings("unchecked")
 	@Override
 	public Row deserialize(byte[] message) throws IOException {
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala
index 253b491..afc6506 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala
@@ -88,11 +88,12 @@ object TypeStringUtils extends JavaTokenParsers with PackratParsers {
     (TIMESTAMP | SQL_TIMESTAMP) ^^ { e => Types.SQL_TIMESTAMP } |
     (TIME | SQL_TIME) ^^ { e => Types.SQL_TIME }
 
-  lazy val escapedFieldName: PackratParser[String] = "\"" ~> stringLiteral <~ "\"" ^^ { s =>
-    StringEscapeUtils.unescapeJava(s)
+  lazy val escapedFieldName: PackratParser[String] = stringLiteral ^^ { s =>
+    val unquoted = s.substring(1, s.length - 1)
+    StringEscapeUtils.unescapeJava(unquoted)
   }
 
-  lazy val fieldName: PackratParser[String] = escapedFieldName | stringLiteral | ident
+  lazy val fieldName: PackratParser[String] = escapedFieldName | ident
 
   lazy val field: PackratParser[(String, TypeInformation[_])] =
     fieldName ~ typeInfo ^^ {

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala
index 29d647c..9ea8be0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala
@@ -18,12 +18,14 @@
 
 package org.apache.flink.table.typeutils
 
+import java.util
+
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.java.typeutils.{RowTypeInfo, TypeExtractor}
 import org.apache.flink.table.api.Types
 import org.apache.flink.table.runtime.utils.CommonTestData.{NonPojo, Person}
-import org.junit.Assert.assertEquals
-import org.junit.Test
+import org.junit.Assert.{assertEquals, assertTrue}
+import org.junit.{Assert, Test}
 
 /**
   * Tests for string-based representation of [[TypeInformation]].
@@ -74,24 +76,18 @@ class TypeStringUtilsTest {
         Array[TypeInformation[_]](Types.DECIMAL, Types.BYTE)))
 
     testReadAndWrite(
-      "ROW(\"he llo\" DECIMAL, world TINYINT)",
-      Types.ROW(
-        Array[String]("he llo", "world"),
-        Array[TypeInformation[_]](Types.DECIMAL, Types.BYTE)))
-
-    testReadAndWrite(
-      "ROW(\"he         \\nllo\" DECIMAL, world TINYINT)",
-      Types.ROW(
-        Array[String]("he         \nllo", "world"),
-        Array[TypeInformation[_]](Types.DECIMAL, Types.BYTE)))
-
-    testReadAndWrite(
       "POJO(org.apache.flink.table.runtime.utils.CommonTestData$Person)",
       TypeExtractor.createTypeInfo(classOf[Person]))
 
     testReadAndWrite(
       "ANY(org.apache.flink.table.runtime.utils.CommonTestData$NonPojo)",
       TypeExtractor.createTypeInfo(classOf[NonPojo]))
+
+    // test escaping
+    assertTrue(
+      TypeStringUtils.readTypeInfo("ROW(\"he         \\nllo\" DECIMAL, world TINYINT)")
+        .asInstanceOf[RowTypeInfo].getFieldNames
+        .sameElements(Array[String]("he         \nllo", "world")))
   }
 
   private def testReadAndWrite(expected: String, tpe: TypeInformation[_]): Unit = {


[3/3] flink git commit: [FLINK-9444] [formats] Add full SQL support for Avro formats

Posted by tw...@apache.org.
[FLINK-9444] [formats] Add full SQL support for Avro formats

This PR adds full support of Apache Avro records for the Table API & SQL. It adds (de)serialization schemas to the row type for both specific and generic records. It converts all Avro types to Flink types and vice versa. It supports both physical and logical Avro types. Both an Avro class or a Avro schema string can be used for format initialization.

This closes #6218.
This closes #6082.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c34c7e41
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c34c7e41
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c34c7e41

Branch: refs/heads/master
Commit: c34c7e4127c8947d68e2b960cd84206e59d479b3
Parents: 19040a6
Author: Timo Walther <tw...@apache.org>
Authored: Tue Jun 26 11:46:06 2018 +0200
Committer: Timo Walther <tw...@apache.org>
Committed: Tue Jul 3 15:40:44 2018 +0200

----------------------------------------------------------------------
 docs/dev/table/sqlClient.md                     |  73 +++-
 .../connectors/kafka/KafkaAvroTableSource.java  |   4 +-
 .../kafka/KafkaAvroTableSourceFactory.java      |   4 +-
 .../kafka/KafkaAvroTableSourceTestBase.java     |  83 +----
 .../flink/api/java/typeutils/RowTypeInfo.java   |  24 +-
 .../api/java/typeutils/RowTypeInfoTest.java     |  29 +-
 .../kryo/KryoWithCustomSerializersTest.java     |   8 +-
 flink-formats/flink-avro/pom.xml                |  37 +++
 .../avro/AvroRowDeserializationSchema.java      | 329 ++++++++++++++-----
 .../avro/AvroRowSerializationSchema.java        | 269 ++++++++++++---
 .../typeutils/AvroRecordClassConverter.java     |  81 -----
 .../avro/typeutils/AvroSchemaConverter.java     | 160 +++++++++
 .../avro/utils/AvroKryoSerializerUtils.java     |  70 ++++
 .../apache/flink/table/descriptors/Avro.java    |  23 +-
 .../flink/table/descriptors/AvroValidator.java  |  15 +-
 .../formats/avro/AvroOutputFormatITCase.java    |  41 ++-
 .../formats/avro/AvroOutputFormatTest.java      |  23 +-
 .../formats/avro/AvroRecordInputFormatTest.java |  73 ++--
 .../avro/AvroRowDeSerializationSchemaTest.java  | 127 +++++--
 .../avro/AvroSplittableInputFormatTest.java     | 115 +++++--
 .../flink/formats/avro/EncoderDecoderTest.java  | 157 +++++----
 .../avro/typeutils/AvroSchemaConverterTest.java | 116 +++++++
 .../avro/typeutils/AvroTypeExtractionTest.java  | 134 ++++----
 .../BackwardsCompatibleAvroSerializerTest.java  |  30 +-
 .../flink/formats/avro/utils/AvroTestUtils.java | 223 +++++++++----
 .../formats/avro/utils/TestDataGenerator.java   |  44 ++-
 .../flink/table/descriptors/AvroTest.java       |  19 +-
 .../table/runtime/batch/AvroTypesITCase.java    |  85 +++--
 .../src/test/resources/avro/user.avsc           |  63 +++-
 .../flink-1.3-avro-type-serialized-data         | Bin 23829 -> 0 bytes
 .../flink-1.3-avro-type-serializer-snapshot     | Bin 33772 -> 0 bytes
 .../flink-1.6-avro-type-serialized-data         | Bin 0 -> 23563 bytes
 .../flink-1.6-avro-type-serializer-snapshot     | Bin 0 -> 36411 bytes
 .../json/JsonRowDeserializationSchema.java      |   1 -
 .../flink/table/typeutils/TypeStringUtils.scala |   7 +-
 .../table/typeutils/TypeStringUtilsTest.scala   |  26 +-
 36 files changed, 1819 insertions(+), 674 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/docs/dev/table/sqlClient.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md
index 2bdec2b..f850082 100644
--- a/docs/dev/table/sqlClient.md
+++ b/docs/dev/table/sqlClient.md
@@ -237,6 +237,7 @@ The SQL Client does not require to setup a Java project using Maven or SBT. Inst
 | :---------------- | :--------------------- |
 | CSV               | Built-in               |
 | JSON              | [Download](http://central.maven.org/maven2/org/apache/flink/flink-json/{{site.version}}/flink-json-{{site.version}}-sql-jar.jar) |
+| Apache Avro       | [Download](http://central.maven.org/maven2/org/apache/flink/flink-avro/{{site.version}}/flink-avro-{{site.version}}-sql-jar.jar) |
 
 {% endif %}
 
@@ -476,7 +477,7 @@ The CSV format is included in Flink and does not require an additional JAR file.
 
 #### JSON Format
 
-The JSON format allows to read JSON data that corresponds to a given format schema. The format schema can be defined either as a Flink [type string](sqlClient.html#type-strings), as a JSON schema, or derived from the desired table schema. A type string enables a more SQL-like definition and mapping to the corresponding SQL data types. The JSON schema allows for more complex and nested structures.
+The JSON format allows to read and write JSON data that corresponds to a given format schema. The format schema can be defined either as a Flink [type string](sqlClient.html#type-strings), as a JSON schema, or derived from the desired table schema. A type string enables a more SQL-like definition and mapping to the corresponding SQL data types. The JSON schema allows for more complex and nested structures.
 
 If the format schema is equal to the table schema, the schema can also be automatically derived. This allows for defining schema information only once. The names, types, and field order of the format are determined by the table's schema. Time attributes are ignored. A `from` definition in the table schema is interpreted as a field renaming in the format.
 
@@ -507,6 +508,23 @@ format:
   derive-schema: true
 {% endhighlight %}
 
+The following table shows the mapping of JSON schema types to Flink SQL types:
+
+| JSON schema                       | Flink SQL               |
+| :-------------------------------- | :---------------------- |
+| `object`                          | `ROW`                   |
+| `boolean`                         | `BOOLEAN`               |
+| `array`                           | `ARRAY[_]`              |
+| `number`                          | `DECIMAL`               |
+| `integer`                         | `DECIMAL`               |
+| `string`                          | `VARCHAR`               |
+| `string` with `format: date-time` | `TIMESTAMP`             |
+| `string` with `format: date`      | `DATE`                  |
+| `string` with `format: time`      | `TIME`                  |
+| `string` with `encoding: base64`  | `ARRAY[TINYINT]`        |
+| `null`                            | `NULL` (unsupported yet)|
+
+
 Currently, Flink supports only a subset of the [JSON schema specification](http://json-schema.org/) `draft-07`. Union types (as well as `allOf`, `anyOf`, `not`) are not supported yet. `oneOf` and arrays of types are only supported for specifying nullability.
 
 Simple references that link to a common definition in the document are supported as shown in the more complex example below:
@@ -558,6 +576,59 @@ Simple references that link to a common definition in the document are supported
 
 Make sure to download the [JSON SQL JAR](sqlClient.html#dependencies) file and pass it to the SQL Client.
 
+#### Apache Avro Format
+
+The [Apache Avro](https://avro.apache.org/) format allows to read and write Avro data that corresponds to a given format schema. The format schema can be defined either as a fully qualified class name of an Avro specific record or as an Avro schema string. If a class name is used, the class must be available in the classpath during runtime.
+
+{% highlight yaml %}
+format:
+  type: avro
+
+  # required: define the schema either by using an Avro specific record class
+  record-class: "org.organization.types.User"
+
+  # or by using an Avro schema
+  avro-schema: >
+    {
+      "type": "record",
+      "name": "test",
+      "fields" : [
+        {"name": "a", "type": "long"},
+        {"name": "b", "type": "string"}
+      ]
+    }
+{% endhighlight %}
+
+Avro types are mapped to the corresponding SQL data types. Union types are only supported for specifying nullability otherwise they are converted to an `ANY` type. The following table shows the mapping:
+
+| Avro schema                                 | Flink SQL               |
+| :------------------------------------------ | :---------------------- |
+| `record`                                    | `ROW`                   |
+| `enum`                                      | `VARCHAR`               |
+| `array`                                     | `ARRAY[_]`              |
+| `map`                                       | `MAP[VARCHAR, _]`       |
+| `union`                                     | non-null type or `ANY`  |
+| `fixed`                                     | `ARRAY[TINYINT]`        |
+| `string`                                    | `VARCHAR`               |
+| `bytes`                                     | `ARRAY[TINYINT]`        |
+| `int`                                       | `INT`                   |
+| `long`                                      | `BIGINT`                |
+| `float`                                     | `FLOAT`                 |
+| `double`                                    | `DOUBLE`                |
+| `boolean`                                   | `BOOLEAN`               |
+| `int` with `logicalType: date`              | `DATE`                  |
+| `int` with `logicalType: time-millis`       | `TIME`                  |
+| `int` with `logicalType: time-micros`       | `INT`                   |
+| `long` with `logicalType: timestamp-millis` | `TIMESTAMP`             |
+| `long` with `logicalType: timestamp-micros` | `BIGINT`                |
+| `bytes` with `logicalType: decimal`         | `DECIMAL`               |
+| `fixed` with `logicalType: decimal`         | `DECIMAL`               |
+| `null`                                      | `NULL` (unsupported yet)|
+
+Avro uses [Joda-Time](http://www.joda.org/joda-time/) for representing logical date and time types in specific record classes. The Joda-Time dependency is not part of Flink's SQL JAR distribution. Therefore, make sure that Joda-Time is in your classpath together with your specific record class during runtime. Avro formats specified via a schema string do not require Joda-Time to be present.
+
+Make sure to download the [Apache Avro SQL JAR](sqlClient.html#dependencies) file and pass it to the SQL Client.
+
 {% top %}
 
 Limitations & Future

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
index 7828a1c..8c8ce32 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.formats.avro.AvroRowDeserializationSchema;
-import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.sources.DefinedFieldMapping;
 import org.apache.flink.table.sources.StreamTableSource;
@@ -64,7 +64,7 @@ public abstract class KafkaAvroTableSource extends KafkaTableSource implements D
 			topic,
 			properties,
 			schema,
-			AvroRecordClassConverter.convert(avroRecordClass));
+			AvroSchemaConverter.convertToTypeInfo(avroRecordClass));
 
 		this.avroRecordClass = avroRecordClass;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceFactory.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceFactory.java
index 1401914..8ef7270 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceFactory.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceFactory.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.descriptors.AvroValidator;
 import org.apache.flink.table.descriptors.DescriptorProperties;
@@ -65,7 +65,7 @@ public abstract class KafkaAvroTableSourceFactory extends KafkaTableSourceFactor
 		final Class<? extends SpecificRecordBase> avroRecordClass =
 				params.getClass(AvroValidator.FORMAT_RECORD_CLASS, SpecificRecordBase.class);
 		builder.forAvroRecordClass(avroRecordClass);
-		final TableSchema formatSchema = TableSchema.fromTypeInfo(AvroRecordClassConverter.convert(avroRecordClass));
+		final TableSchema formatSchema = TableSchema.fromTypeInfo(AvroSchemaConverter.convertToTypeInfo(avroRecordClass));
 
 		// field mapping
 		final Map<String, String> mapping = SchemaValidator.deriveFieldMapping(params, Optional.of(formatSchema));

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
index 16beb7d..f86fc95 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
@@ -18,17 +18,14 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.formats.avro.utils.AvroTestUtils;
+import org.apache.flink.formats.avro.generated.DifferentSchemaRecord;
+import org.apache.flink.formats.avro.generated.SchemaRecord;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.Types;
 
-import org.apache.avro.Schema;
-import org.apache.avro.specific.SpecificRecordBase;
 import org.junit.Test;
 
-import java.sql.Timestamp;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -44,7 +41,7 @@ public abstract class KafkaAvroTableSourceTestBase extends KafkaTableSourceTestB
 	@Override
 	protected void configureBuilder(KafkaTableSource.Builder builder) {
 		super.configureBuilder(builder);
-		((KafkaAvroTableSource.Builder) builder).forAvroRecordClass(SameFieldsAvroClass.class);
+		((KafkaAvroTableSource.Builder) builder).forAvroRecordClass(SchemaRecord.class);
 	}
 
 	@Test
@@ -67,8 +64,8 @@ public abstract class KafkaAvroTableSourceTestBase extends KafkaTableSourceTestB
 		// check field types
 		assertEquals(Types.LONG(), returnType.getTypeAt(0));
 		assertEquals(Types.STRING(), returnType.getTypeAt(1));
-		assertEquals(Types.SQL_TIMESTAMP(), returnType.getTypeAt(2));
-		assertEquals(Types.SQL_TIMESTAMP(), returnType.getTypeAt(3));
+		assertEquals(Types.LONG(), returnType.getTypeAt(2));
+		assertEquals(Types.LONG(), returnType.getTypeAt(3));
 		assertEquals(Types.DOUBLE(), returnType.getTypeAt(4));
 
 		// check field mapping
@@ -91,7 +88,7 @@ public abstract class KafkaAvroTableSourceTestBase extends KafkaTableSourceTestB
 		mapping.put("field3", "otherField3");
 
 		// set Avro class with different fields
-		b.forAvroRecordClass(DifferentFieldsAvroClass.class);
+		b.forAvroRecordClass(DifferentSchemaRecord.class);
 		b.withTableToAvroMapping(mapping);
 
 		KafkaAvroTableSource source = (KafkaAvroTableSource) b.build();
@@ -110,9 +107,9 @@ public abstract class KafkaAvroTableSourceTestBase extends KafkaTableSourceTestB
 		// check field types
 		assertEquals(Types.LONG(), returnType.getTypeAt(0));
 		assertEquals(Types.STRING(), returnType.getTypeAt(1));
-		assertEquals(Types.SQL_TIMESTAMP(), returnType.getTypeAt(2));
+		assertEquals(Types.LONG(), returnType.getTypeAt(2));
 		assertEquals(Types.DOUBLE(), returnType.getTypeAt(3));
-		assertEquals(Types.BYTE(), returnType.getTypeAt(4));
+		assertEquals(Types.FLOAT(), returnType.getTypeAt(4));
 		assertEquals(Types.INT(), returnType.getTypeAt(5));
 
 		// check field mapping
@@ -127,68 +124,4 @@ public abstract class KafkaAvroTableSourceTestBase extends KafkaTableSourceTestB
 		assertEquals(source.getReturnType(),
 			source.getDataStream(StreamExecutionEnvironment.getExecutionEnvironment()).getType());
 	}
-
-	/**
-	 * Avro record that matches the table schema.
-	 */
-	@SuppressWarnings("unused")
-	public static class SameFieldsAvroClass extends SpecificRecordBase {
-
-		//CHECKSTYLE.OFF: StaticVariableNameCheck - Avro accesses this field by name via reflection.
-		public static Schema SCHEMA$ = AvroTestUtils.createFlatAvroSchema(FIELD_NAMES, FIELD_TYPES);
-		//CHECKSTYLE.ON: StaticVariableNameCheck
-
-		public Long field1;
-		public String field2;
-		public Timestamp time1;
-		public Timestamp time2;
-		public Double field3;
-
-		@Override
-		public Schema getSchema() {
-			return null;
-		}
-
-		@Override
-		public Object get(int field) {
-			return null;
-		}
-
-		@Override
-		public void put(int field, Object value) { }
-	}
-
-	/**
-	 * Avro record that does NOT match the table schema.
-	 */
-	@SuppressWarnings("unused")
-	public static class DifferentFieldsAvroClass extends SpecificRecordBase {
-
-		//CHECKSTYLE.OFF: StaticVariableNameCheck - Avro accesses this field by name via reflection.
-		public static Schema SCHEMA$ = AvroTestUtils.createFlatAvroSchema(
-			new String[]{"otherField1", "otherField2", "otherTime1", "otherField3", "otherField4", "otherField5"},
-			new TypeInformation[]{Types.LONG(), Types.STRING(), Types.SQL_TIMESTAMP(), Types.DOUBLE(), Types.BYTE(), Types.INT()});
-		//CHECKSTYLE.ON: StaticVariableNameCheck
-
-		public Long otherField1;
-		public String otherField2;
-		public Timestamp otherTime1;
-		public Double otherField3;
-		public Byte otherField4;
-		public Integer otherField5;
-
-		@Override
-		public Schema getSchema() {
-			return null;
-		}
-
-		@Override
-		public Object get(int field) {
-			return null;
-		}
-
-		@Override
-		public void put(int field, Object value) { }
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
index 24ccfb1..75c28ef 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
@@ -249,7 +249,22 @@ public class RowTypeInfo extends TupleTypeInfoBase<Row> {
 
 	@Override
 	public int hashCode() {
-		return 31 * super.hashCode() + Arrays.hashCode(fieldNames);
+		return 31 * super.hashCode();
+	}
+
+	/**
+	 * The equals method does only check for field types. Field names do not matter during
+	 * runtime so we can consider rows with the same field types as equal.
+	 * Use {@link RowTypeInfo#schemaEquals(Object)} for checking schema-equivalence.
+	 */
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof RowTypeInfo) {
+			final RowTypeInfo other = (RowTypeInfo) obj;
+			return other.canEqual(this) && super.equals(other);
+		} else {
+			return false;
+		}
 	}
 
 	@Override
@@ -274,6 +289,13 @@ public class RowTypeInfo extends TupleTypeInfoBase<Row> {
 		return types;
 	}
 
+	/**
+	 * Tests whether an other object describes the same, schema-equivalent row information.
+	 */
+	public boolean schemaEquals(Object obj) {
+		return equals(obj) && Arrays.equals(fieldNames, ((RowTypeInfo) obj).fieldNames);
+	}
+
 	private boolean hasDuplicateFieldNames(String[] fieldNames) {
 		HashSet<String> names = new HashSet<>();
 		for (String field : fieldNames) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java
index 03d1e04..f17ca95 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
 import org.apache.flink.api.common.typeutils.TypeInformationTestBase;
+
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -28,7 +29,8 @@ import java.util.List;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test for {@link RowTypeInfo}.
@@ -47,7 +49,10 @@ public class RowTypeInfoTest extends TypeInformationTestBase<RowTypeInfo> {
 		return new RowTypeInfo[] {
 			new RowTypeInfo(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO),
 			new RowTypeInfo(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO),
-			new RowTypeInfo(typeList)
+			new RowTypeInfo(typeList),
+			new RowTypeInfo(
+				new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO},
+				new String[]{"int", "int2"})
 		};
 	}
 
@@ -123,4 +128,24 @@ public class RowTypeInfoTest extends TypeInformationTestBase<RowTypeInfo> {
 		assertEquals("Short", typeInfo.getTypeAt("f1.f0").toString());
 	}
 
+	@Test
+	public void testSchemaEquals() {
+		final RowTypeInfo row1 = new RowTypeInfo(
+			new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO},
+			new String[] {"field1", "field2"});
+		final RowTypeInfo row2 = new RowTypeInfo(
+			new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO},
+			new String[] {"field1", "field2"});
+		assertTrue(row1.schemaEquals(row2));
+
+		final RowTypeInfo other1 = new RowTypeInfo(
+			new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO},
+			new String[] {"otherField", "field2"});
+		final RowTypeInfo other2 = new RowTypeInfo(
+			new TypeInformation[]{BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO},
+			new String[] {"field1", "field2"});
+		assertFalse(row1.schemaEquals(other1));
+		assertFalse(row1.schemaEquals(other2));
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java
index d68afd6..e28221f 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java
@@ -55,9 +55,9 @@ public class KryoWithCustomSerializersTest extends AbstractGenericTypeSerializer
 		TypeInformation<T> typeInfo = new GenericTypeInfo<T>(type);
 		return typeInfo.createSerializer(conf);
 	}
-	
+
 	public static final class LocalDateSerializer extends Serializer<LocalDate> implements java.io.Serializable {
-		
+
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -66,10 +66,10 @@ public class KryoWithCustomSerializersTest extends AbstractGenericTypeSerializer
 			output.writeInt(object.getMonthOfYear());
 			output.writeInt(object.getDayOfMonth());
 		}
-		
+
 		@Override
 		public LocalDate read(Kryo kryo, Input input, Class<LocalDate> type) {
 			return new LocalDate(input.readInt(), input.readInt(), input.readInt());
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/pom.xml
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/pom.xml b/flink-formats/flink-avro/pom.xml
index 2a437f6..dbf7fd0 100644
--- a/flink-formats/flink-avro/pom.xml
+++ b/flink-formats/flink-avro/pom.xml
@@ -52,6 +52,17 @@ under the License.
 		</dependency>
 
 		<dependency>
+			<groupId>joda-time</groupId>
+			<artifactId>joda-time</artifactId>
+			<!-- managed version -->
+			<scope>provided</scope>
+			<!-- Avro records can contain JodaTime fields when using logical fields.
+				In order to handle them, we need to add an optional dependency.
+				Users with those Avro records need to add this dependency themselves. -->
+			<optional>true</optional>
+		</dependency>
+
+		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<!-- use a dedicated Scala version to not depend on it -->
 			<artifactId>flink-table_2.11</artifactId>
@@ -97,6 +108,32 @@ under the License.
 		</dependency>
 	</dependencies>
 
+	<profiles>
+		<profile>
+			<!-- Create SQL Client uber jars for releases -->
+			<id>release</id>
+			<build>
+				<plugins>
+					<plugin>
+						<groupId>org.apache.maven.plugins</groupId>
+						<artifactId>maven-jar-plugin</artifactId>
+						<executions>
+							<execution>
+								<phase>package</phase>
+								<goals>
+									<goal>jar</goal>
+								</goals>
+								<configuration>
+									<classifier>sql-jar</classifier>
+								</configuration>
+							</execution>
+						</executions>
+					</plugin>
+				</plugins>
+			</build>
+		</profile>
+	</profiles>
+
 	<build>
 		<plugins>
 			<plugin>

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
index 276257a..c36a4be 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
@@ -17,116 +17,157 @@
 
 package org.apache.flink.formats.avro;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.avro.util.Utf8;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeFieldType;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
 
 /**
- * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}.
+ * Deserialization schema from Avro bytes to {@link Row}.
  *
- * <p>Deserializes the <code>byte[]</code> messages into (nested) Flink Rows.
+ * <p>Deserializes the <code>byte[]</code> messages into (nested) Flink rows. It converts Avro types
+ * into types that are compatible with Flink's Table & SQL API.
  *
- * {@link Utf8} is converted to regular Java Strings.
+ * <p>Projects with Avro records containing logical date/time types need to add a JodaTime
+ * dependency.
+ *
+ * <p>Note: Changes in this class need to be kept in sync with the corresponding runtime
+ * class {@link AvroRowSerializationSchema} and schema converter {@link AvroSchemaConverter}.
  */
+@PublicEvolving
 public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> {
 
 	/**
-	 * Avro record class.
+	 * Used for time conversions into SQL types.
+	 */
+	private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+	/**
+	 * Avro record class for deserialization. Might be null if record class is not available.
 	 */
 	private Class<? extends SpecificRecord> recordClazz;
 
 	/**
-	 * Schema for deterministic field order.
+	 * Schema string for deserialization.
+	 */
+	private String schemaString;
+
+	/**
+	 * Avro serialization schema.
 	 */
 	private transient Schema schema;
 
 	/**
-	 * Reader that deserializes byte array into a record.
+	 * Type information describing the result type.
 	 */
-	private transient DatumReader<SpecificRecord> datumReader;
+	private transient RowTypeInfo typeInfo;
 
 	/**
-	 * Input stream to read message from.
+	 * Record to deserialize byte array.
 	 */
-	private transient MutableByteArrayInputStream inputStream;
+	private transient IndexedRecord record;
 
 	/**
-	 * Avro decoder that decodes binary data.
+	 * Reader that deserializes byte array into a record.
 	 */
-	private transient Decoder decoder;
+	private transient DatumReader<IndexedRecord> datumReader;
 
 	/**
-	 * Record to deserialize byte array to.
+	 * Input stream to read message from.
 	 */
-	private SpecificRecord record;
+	private transient MutableByteArrayInputStream inputStream;
 
 	/**
-	 * Type information describing the result type.
+	 * Avro decoder that decodes binary data.
 	 */
-	private transient TypeInformation<Row> typeInfo;
+	private transient Decoder decoder;
 
 	/**
-	 * Creates a Avro deserialization schema for the given record.
+	 * Creates a Avro deserialization schema for the given specific record class. Having the
+	 * concrete Avro record class might improve performance.
 	 *
 	 * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row
 	 */
-	public AvroRowDeserializationSchema(Class<? extends SpecificRecordBase> recordClazz) {
+	public AvroRowDeserializationSchema(Class<? extends SpecificRecord> recordClazz) {
 		Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
 		this.recordClazz = recordClazz;
-		this.schema = SpecificData.get().getSchema(recordClazz);
-		this.datumReader = new SpecificDatumReader<>(schema);
-		this.record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema);
-		this.inputStream = new MutableByteArrayInputStream();
-		this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
-		this.typeInfo = AvroRecordClassConverter.convert(recordClazz);
+		schema = SpecificData.get().getSchema(recordClazz);
+		typeInfo = (RowTypeInfo) AvroSchemaConverter.convertToTypeInfo(recordClazz);
+		schemaString = schema.toString();
+		record = (IndexedRecord) SpecificData.newInstance(recordClazz, schema);
+		datumReader = new SpecificDatumReader<>(schema);
+		inputStream = new MutableByteArrayInputStream();
+		decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
+	}
+
+	/**
+	 * Creates a Avro deserialization schema for the given Avro schema string.
+	 *
+	 * @param avroSchemaString Avro schema string to deserialize Avro's record to Flink's row
+	 */
+	public AvroRowDeserializationSchema(String avroSchemaString) {
+		Preconditions.checkNotNull(avroSchemaString, "Avro schema must not be null.");
+		recordClazz = null;
+		final TypeInformation<?> typeInfo = AvroSchemaConverter.convertToTypeInfo(avroSchemaString);
+		Preconditions.checkArgument(typeInfo instanceof RowTypeInfo, "Row type information expected.");
+		this.typeInfo = (RowTypeInfo) typeInfo;
+		schemaString = avroSchemaString;
+		schema = new Schema.Parser().parse(avroSchemaString);
+		record = new GenericData.Record(schema);
+		datumReader = new GenericDatumReader<>(schema);
+		inputStream = new MutableByteArrayInputStream();
+		decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
 	}
 
 	@Override
 	public Row deserialize(byte[] message) throws IOException {
-		// read record
 		try {
 			inputStream.setBuffer(message);
-			this.record = datumReader.read(record, decoder);
-		} catch (IOException e) {
-			throw new RuntimeException("Failed to deserialize Row.", e);
+			record = datumReader.read(record, decoder);
+			return convertAvroRecordToRow(schema, typeInfo, record);
+		} catch (Exception e) {
+			throw new IOException("Failed to deserialize Avro record.", e);
 		}
-
-		// convert to row
-		final Object row = convertToRow(schema, record);
-		return (Row) row;
-	}
-
-	private void writeObject(ObjectOutputStream oos) throws IOException {
-		oos.writeObject(recordClazz);
-	}
-
-	@SuppressWarnings("unchecked")
-	private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException {
-		this.recordClazz = (Class<? extends SpecificRecord>) ois.readObject();
-		this.schema = SpecificData.get().getSchema(recordClazz);
-		this.datumReader = new SpecificDatumReader<>(schema);
-		this.record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema);
-		this.inputStream = new MutableByteArrayInputStream();
-		this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
 	}
 
 	@Override
@@ -134,37 +175,175 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 		return typeInfo;
 	}
 
-	/**
-	 * Converts a (nested) Avro {@link SpecificRecord} into Flink's Row type.
-	 * Avro's {@link Utf8} fields are converted into regular Java strings.
-	 */
-	private static Object convertToRow(Schema schema, Object recordObj) {
-		if (recordObj instanceof GenericRecord) {
-			// records can be wrapped in a union
-			if (schema.getType() == Schema.Type.UNION) {
+	// --------------------------------------------------------------------------------------------
+
+	private Row convertAvroRecordToRow(Schema schema, RowTypeInfo typeInfo, IndexedRecord record) {
+		final List<Schema.Field> fields = schema.getFields();
+		final TypeInformation<?>[] fieldInfo = typeInfo.getFieldTypes();
+		final int length = fields.size();
+		final Row row = new Row(length);
+		for (int i = 0; i < length; i++) {
+			final Schema.Field field = fields.get(i);
+			row.setField(i, convertAvroType(field.schema(), fieldInfo[i], record.get(i)));
+		}
+		return row;
+	}
+
+	private Object convertAvroType(Schema schema, TypeInformation<?> info, Object object) {
+		// we perform the conversion based on schema information but enriched with pre-computed
+		// type information where useful (i.e., for arrays)
+
+		if (object == null) {
+			return null;
+		}
+		switch (schema.getType()) {
+			case RECORD:
+				if (object instanceof IndexedRecord) {
+					return convertAvroRecordToRow(schema, (RowTypeInfo) info, (IndexedRecord) object);
+				}
+				throw new IllegalStateException("IndexedRecord expected but was: " + object.getClass());
+			case ENUM:
+			case STRING:
+				return object.toString();
+			case ARRAY:
+				if (info instanceof BasicArrayTypeInfo) {
+					final TypeInformation<?> elementInfo = ((BasicArrayTypeInfo<?, ?>) info).getComponentInfo();
+					return convertToObjectArray(schema.getElementType(), elementInfo, object);
+				} else {
+					final TypeInformation<?> elementInfo = ((ObjectArrayTypeInfo<?, ?>) info).getComponentInfo();
+					return convertToObjectArray(schema.getElementType(), elementInfo, object);
+				}
+			case MAP:
+				final MapTypeInfo<?, ?> mapTypeInfo = (MapTypeInfo<?, ?>) info;
+				final Map<String, Object> convertedMap = new HashMap<>();
+				final Map<?, ?> map = (Map<?, ?>) object;
+				for (Map.Entry<?, ?> entry : map.entrySet()) {
+					convertedMap.put(
+						entry.getKey().toString(),
+						convertAvroType(schema.getValueType(), mapTypeInfo.getValueTypeInfo(), entry.getValue()));
+				}
+				return convertedMap;
+			case UNION:
 				final List<Schema> types = schema.getTypes();
-				if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
-					schema = types.get(1);
+				final int size = types.size();
+				final Schema actualSchema;
+				if (size == 2 && types.get(0).getType() == Schema.Type.NULL) {
+					return convertAvroType(types.get(1), info, object);
+				} else if (size == 2 && types.get(1).getType() == Schema.Type.NULL) {
+					return convertAvroType(types.get(0), info, object);
+				} else if (size == 1) {
+					return convertAvroType(types.get(0), info, object);
+				} else {
+					// generic type
+					return object;
+				}
+			case FIXED:
+				final byte[] fixedBytes = ((GenericFixed) object).bytes();
+				if (info == Types.BIG_DEC) {
+					return convertToDecimal(schema, fixedBytes);
 				}
-				else {
-					throw new RuntimeException("Currently we only support schemas of the following form: UNION[null, RECORD]. Given: " + schema);
+				return fixedBytes;
+			case BYTES:
+				final ByteBuffer byteBuffer = (ByteBuffer) object;
+				final byte[] bytes = new byte[byteBuffer.remaining()];
+				byteBuffer.get(bytes);
+				if (info == Types.BIG_DEC) {
+					return convertToDecimal(schema, bytes);
 				}
-			} else if (schema.getType() != Schema.Type.RECORD) {
-				throw new RuntimeException("Record type for row type expected. But is: " + schema);
-			}
-			final List<Schema.Field> fields = schema.getFields();
-			final Row row = new Row(fields.size());
-			final GenericRecord record = (GenericRecord) recordObj;
-			for (int i = 0; i < fields.size(); i++) {
-				final Schema.Field field = fields.get(i);
-				row.setField(i, convertToRow(field.schema(), record.get(field.pos())));
-			}
-			return row;
-		} else if (recordObj instanceof Utf8) {
-			return recordObj.toString();
+				return bytes;
+			case INT:
+				if (info == Types.SQL_DATE) {
+					return convertToDate(object);
+				} else if (info == Types.SQL_TIME) {
+					return convertToTime(object);
+				}
+				return object;
+			case LONG:
+				if (info == Types.SQL_TIMESTAMP) {
+					return convertToTimestamp(object);
+				}
+				return object;
+			case FLOAT:
+			case DOUBLE:
+			case BOOLEAN:
+				return object;
+		}
+		throw new RuntimeException("Unsupported Avro type:" + schema);
+	}
+
+	private BigDecimal convertToDecimal(Schema schema, byte[] bytes) {
+		final LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) schema.getLogicalType();
+		return new BigDecimal(new BigInteger(bytes), decimalType.getScale());
+	}
+
+	private Date convertToDate(Object object) {
+		final long millis;
+		if (object instanceof Integer) {
+			final Integer value = (Integer) object;
+			// adopted from Apache Calcite
+			final long t = (long) value * 86400000L;
+			millis = t - (long) LOCAL_TZ.getOffset(t);
+		} else {
+			// use 'provided' Joda time
+			final LocalDate value = (LocalDate) object;
+			millis = value.toDate().getTime();
+		}
+		return new Date(millis);
+	}
+
+	private Time convertToTime(Object object) {
+		final long millis;
+		if (object instanceof Integer) {
+			millis = (Integer) object;
 		} else {
-			return recordObj;
+			// use 'provided' Joda time
+			final LocalTime value = (LocalTime) object;
+			millis = (long) value.get(DateTimeFieldType.millisOfDay());
 		}
+		return new Time(millis - LOCAL_TZ.getOffset(millis));
 	}
 
+	private Timestamp convertToTimestamp(Object object) {
+		final long millis;
+		if (object instanceof Long) {
+			millis = (Long) object;
+		} else {
+			// use 'provided' Joda time
+			final DateTime value = (DateTime) object;
+			millis = value.toDate().getTime();
+		}
+		return new Timestamp(millis - LOCAL_TZ.getOffset(millis));
+	}
+
+	private Object[] convertToObjectArray(Schema elementSchema, TypeInformation<?> elementInfo, Object object) {
+		final List<?> list = (List<?>) object;
+		final Object[] convertedArray = (Object[]) Array.newInstance(
+			elementInfo.getTypeClass(),
+			list.size());
+		for (int i = 0; i < list.size(); i++) {
+			convertedArray[i] = convertAvroType(elementSchema, elementInfo, list.get(i));
+		}
+		return convertedArray;
+	}
+
+	private void writeObject(ObjectOutputStream outputStream) throws IOException {
+		outputStream.writeObject(recordClazz);
+		outputStream.writeUTF(schemaString);
+	}
+
+	@SuppressWarnings("unchecked")
+	private void readObject(ObjectInputStream inputStream) throws ClassNotFoundException, IOException {
+		recordClazz = (Class<? extends SpecificRecord>) inputStream.readObject();
+		schemaString = inputStream.readUTF();
+		typeInfo = (RowTypeInfo) AvroSchemaConverter.<Row>convertToTypeInfo(schemaString);
+		schema = new Schema.Parser().parse(schemaString);
+		if (recordClazz != null) {
+			record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema);
+		} else {
+			record = new GenericData.Record(schema);
+		}
+		datumReader = new SpecificDatumReader<>(schema);
+		this.inputStream = new MutableByteArrayInputStream();
+		decoder = DecoderFactory.get().binaryDecoder(this.inputStream, null);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
index 41000a6..80f5f1d 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
@@ -19,12 +19,18 @@
 package org.apache.flink.formats.avro;
 
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
+import org.apache.avro.SchemaParseException;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.io.Encoder;
 import org.apache.avro.io.EncoderFactory;
@@ -37,19 +43,43 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
 
 /**
- * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes.
+ * Serialization schema that serializes {@link Row} into Avro bytes.
+ *
+ * <p>Serializes objects that are represented in (nested) Flink rows. It support types that
+ * are compatible with Flink's Table & SQL API.
+ *
+ * <p>Note: Changes in this class need to be kept in sync with the corresponding runtime
+ * class {@link AvroRowDeserializationSchema} and schema converter {@link AvroSchemaConverter}.
  */
 public class AvroRowSerializationSchema implements SerializationSchema<Row> {
 
 	/**
-	 * Avro record class.
+	 * Used for time conversions from SQL types.
+	 */
+	private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+	/**
+	 * Avro record class for serialization. Might be null if record class is not available.
 	 */
 	private Class<? extends SpecificRecord> recordClazz;
 
 	/**
+	 * Schema string for deserialization.
+	 */
+	private String schemaString;
+
+	/**
 	 * Avro serialization schema.
 	 */
 	private transient Schema schema;
@@ -57,93 +87,226 @@ public class AvroRowSerializationSchema implements SerializationSchema<Row> {
 	/**
 	 * Writer to serialize Avro record into a byte array.
 	 */
-	private transient DatumWriter<GenericRecord> datumWriter;
+	private transient DatumWriter<IndexedRecord> datumWriter;
 
 	/**
 	 * Output stream to serialize records into byte array.
 	 */
-	private transient ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
+	private transient ByteArrayOutputStream arrayOutputStream;
 
 	/**
 	 * Low-level class for serialization of Avro values.
 	 */
-	private transient Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+	private transient Encoder encoder;
 
 	/**
-	 * Creates a Avro serialization schema for the given schema.
+	 * Creates an Avro serialization schema for the given specific record class.
 	 *
-	 * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row
+	 * @param recordClazz Avro record class used to serialize Flink's row to Avro's record
 	 */
 	public AvroRowSerializationSchema(Class<? extends SpecificRecord> recordClazz) {
 		Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
 		this.recordClazz = recordClazz;
 		this.schema = SpecificData.get().getSchema(recordClazz);
+		this.schemaString = schema.toString();
 		this.datumWriter = new SpecificDatumWriter<>(schema);
+		this.arrayOutputStream = new ByteArrayOutputStream();
+		this.encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+	}
+
+	/**
+	 * Creates an Avro serialization schema for the given Avro schema string.
+	 *
+	 * @param avroSchemaString Avro schema string used to serialize Flink's row to Avro's record
+	 */
+	public AvroRowSerializationSchema(String avroSchemaString) {
+		Preconditions.checkNotNull(avroSchemaString, "Avro schema must not be null.");
+		this.recordClazz = null;
+		this.schemaString = avroSchemaString;
+		try {
+			this.schema = new Schema.Parser().parse(avroSchemaString);
+		} catch (SchemaParseException e) {
+			throw new IllegalArgumentException("Could not parse Avro schema string.", e);
+		}
+		this.datumWriter = new GenericDatumWriter<>(schema);
+		this.arrayOutputStream = new ByteArrayOutputStream();
+		this.encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
 	}
 
 	@Override
-	@SuppressWarnings("unchecked")
 	public byte[] serialize(Row row) {
-		// convert to record
-		final Object record = convertToRecord(schema, row);
-
-		// write
 		try {
+			// convert to record
+			final GenericRecord record = convertRowToAvroRecord(schema, row);
 			arrayOutputStream.reset();
-			datumWriter.write((GenericRecord) record, encoder);
+			datumWriter.write(record, encoder);
 			encoder.flush();
 			return arrayOutputStream.toByteArray();
-		} catch (IOException e) {
-			throw new RuntimeException("Failed to serialize Row.", e);
+		} catch (Exception e) {
+			throw new RuntimeException("Failed to serialize row.", e);
 		}
 	}
 
-	private void writeObject(ObjectOutputStream oos) throws IOException {
-		oos.writeObject(recordClazz);
-	}
+	// --------------------------------------------------------------------------------------------
 
-	@SuppressWarnings("unchecked")
-	private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException {
-		this.recordClazz = (Class<? extends SpecificRecord>) ois.readObject();
-		this.schema = SpecificData.get().getSchema(recordClazz);
-		this.datumWriter = new SpecificDatumWriter<>(schema);
-		this.arrayOutputStream = new ByteArrayOutputStream();
-		this.encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+	private GenericRecord convertRowToAvroRecord(Schema schema, Row row) {
+		final List<Schema.Field> fields = schema.getFields();
+		final int length = fields.size();
+		final GenericRecord record = new GenericData.Record(schema);
+		for (int i = 0; i < length; i++) {
+			final Schema.Field field = fields.get(i);
+			record.put(i, convertFlinkType(field.schema(), row.getField(i)));
+		}
+		return record;
 	}
 
-	/**
-	 * Converts a (nested) Flink Row into Avro's {@link GenericRecord}.
-	 * Strings are converted into Avro's {@link Utf8} fields.
-	 */
-	private static Object convertToRecord(Schema schema, Object rowObj) {
-		if (rowObj instanceof Row) {
-			// records can be wrapped in a union
-			if (schema.getType() == Schema.Type.UNION) {
+	private Object convertFlinkType(Schema schema, Object object) {
+		if (object == null) {
+			return null;
+		}
+		switch (schema.getType()) {
+			case RECORD:
+				if (object instanceof Row) {
+					return convertRowToAvroRecord(schema, (Row) object);
+				}
+				throw new IllegalStateException("Row expected but was: " + object.getClass());
+			case ENUM:
+				return new GenericData.EnumSymbol(schema, object.toString());
+			case ARRAY:
+				final Schema elementSchema = schema.getElementType();
+				final Object[] array = (Object[]) object;
+				final GenericData.Array<Object> convertedArray = new GenericData.Array<>(array.length, schema);
+				for (Object element : array) {
+					convertedArray.add(convertFlinkType(elementSchema, element));
+				}
+				return convertedArray;
+			case MAP:
+				final Map<?, ?> map = (Map<?, ?>) object;
+				final Map<Utf8, Object> convertedMap = new HashMap<>();
+				for (Map.Entry<?, ?> entry : map.entrySet()) {
+					convertedMap.put(
+						new Utf8(entry.getKey().toString()),
+						convertFlinkType(schema.getValueType(), entry.getValue()));
+				}
+				return convertedMap;
+			case UNION:
 				final List<Schema> types = schema.getTypes();
-				if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
-					schema = types.get(1);
+				final int size = types.size();
+				final Schema actualSchema;
+				if (size == 2 && types.get(0).getType() == Schema.Type.NULL) {
+					actualSchema = types.get(1);
+				} else if (size == 2 && types.get(1).getType() == Schema.Type.NULL) {
+					actualSchema = types.get(0);
+				} else if (size == 1) {
+					actualSchema = types.get(0);
+				} else {
+					// generic type
+					return object;
+				}
+				return convertFlinkType(actualSchema, object);
+			case FIXED:
+				// check for logical type
+				if (object instanceof BigDecimal) {
+					return new GenericData.Fixed(
+						schema,
+						convertFromDecimal(schema, (BigDecimal) object));
 				}
-				else if (types.size() == 2 && types.get(0).getType() == Schema.Type.RECORD && types.get(1).getType() == Schema.Type.NULL) {
-					schema = types.get(0);
+				return new GenericData.Fixed(schema, (byte[]) object);
+			case STRING:
+				return new Utf8(object.toString());
+			case BYTES:
+				// check for logical type
+				if (object instanceof BigDecimal) {
+					return ByteBuffer.wrap(convertFromDecimal(schema, (BigDecimal) object));
 				}
-				else {
-					throw new RuntimeException("Currently we only support schemas of the following form: UNION[null, RECORD] or UNION[RECORD, NULL] Given: " + schema);
+				return ByteBuffer.wrap((byte[]) object);
+			case INT:
+				// check for logical types
+				if (object instanceof Date) {
+					return convertFromDate(schema, (Date) object);
+				} else if (object instanceof Time) {
+					return convertFromTime(schema, (Time) object);
 				}
-			} else if (schema.getType() != Schema.Type.RECORD) {
-				throw new RuntimeException("Record type for row type expected. But is: " + schema);
-			}
-			final List<Schema.Field> fields = schema.getFields();
-			final GenericRecord record = new GenericData.Record(schema);
-			final Row row = (Row) rowObj;
-			for (int i = 0; i < fields.size(); i++) {
-				final Schema.Field field = fields.get(i);
-				record.put(field.pos(), convertToRecord(field.schema(), row.getField(i)));
-			}
-			return record;
-		} else if (rowObj instanceof String) {
-			return new Utf8((String) rowObj);
+				return object;
+			case LONG:
+				// check for logical type
+				if (object instanceof Timestamp) {
+					return convertFromTimestamp(schema, (Timestamp) object);
+				}
+				return object;
+			case FLOAT:
+			case DOUBLE:
+			case BOOLEAN:
+				return object;
+		}
+		throw new RuntimeException("Unsupported Avro type:" + schema);
+	}
+
+	private byte[] convertFromDecimal(Schema schema, BigDecimal decimal) {
+		final LogicalType logicalType = schema.getLogicalType();
+		if (logicalType instanceof LogicalTypes.Decimal) {
+			final LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType;
+			// rescale to target type
+			final BigDecimal rescaled = decimal.setScale(decimalType.getScale(), BigDecimal.ROUND_UNNECESSARY);
+			// byte array must contain the two's-complement representation of the
+			// unscaled integer value in big-endian byte order
+			return decimal.unscaledValue().toByteArray();
+		} else {
+			throw new RuntimeException("Unsupported decimal type.");
+		}
+	}
+
+	private int convertFromDate(Schema schema, Date date) {
+		final LogicalType logicalType = schema.getLogicalType();
+		if (logicalType == LogicalTypes.date()) {
+			// adopted from Apache Calcite
+			final long time = date.getTime();
+			final long converted = time + (long) LOCAL_TZ.getOffset(time);
+			return (int) (converted / 86400000L);
+		} else {
+			throw new RuntimeException("Unsupported date type.");
+		}
+	}
+
+	private int convertFromTime(Schema schema, Time date) {
+		final LogicalType logicalType = schema.getLogicalType();
+		if (logicalType == LogicalTypes.timeMillis()) {
+			// adopted from Apache Calcite
+			final long time = date.getTime();
+			final long converted = time + (long) LOCAL_TZ.getOffset(time);
+			return (int) (converted % 86400000L);
+		} else {
+			throw new RuntimeException("Unsupported time type.");
+		}
+	}
+
+	private long convertFromTimestamp(Schema schema, Timestamp date) {
+		final LogicalType logicalType = schema.getLogicalType();
+		if (logicalType == LogicalTypes.timestampMillis()) {
+			// adopted from Apache Calcite
+			final long time = date.getTime();
+			return time + (long) LOCAL_TZ.getOffset(time);
+		} else {
+			throw new RuntimeException("Unsupported timestamp type.");
+		}
+	}
+
+	private void writeObject(ObjectOutputStream outputStream) throws IOException {
+		outputStream.writeObject(recordClazz);
+		outputStream.writeObject(schemaString); // support for null
+	}
+
+	@SuppressWarnings("unchecked")
+	private void readObject(ObjectInputStream inputStream) throws ClassNotFoundException, IOException {
+		recordClazz = (Class<? extends SpecificRecord>) inputStream.readObject();
+		schemaString = (String) inputStream.readObject();
+		if (recordClazz != null) {
+			schema = SpecificData.get().getSchema(recordClazz);
 		} else {
-			return rowObj;
+			schema = new Schema.Parser().parse(schemaString);
 		}
+		datumWriter = new SpecificDatumWriter<>(schema);
+		arrayOutputStream = new ByteArrayOutputStream();
+		encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java
deleted file mode 100644
index b7b4871..0000000
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.formats.avro.typeutils;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.types.Row;
-
-import org.apache.avro.Schema;
-import org.apache.avro.specific.SpecificData;
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.avro.util.Utf8;
-
-import java.util.List;
-
-/**
- * Utilities for Avro record class conversion.
- */
-public class AvroRecordClassConverter {
-
-	private AvroRecordClassConverter() {
-		// private
-	}
-
-	/**
-	 * Converts the extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order.
-	 * Replaces generic Utf8 with basic String type information.
-	 */
-	@SuppressWarnings("unchecked")
-	public static <T extends SpecificRecordBase> TypeInformation<Row> convert(Class<T> avroClass) {
-		final AvroTypeInfo<T> avroTypeInfo = new AvroTypeInfo<>(avroClass);
-		// determine schema to retrieve deterministic field order
-		final Schema schema = SpecificData.get().getSchema(avroClass);
-		return (TypeInformation<Row>) convertType(avroTypeInfo, schema);
-	}
-
-	/**
-	 * Recursively converts extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order.
-	 * Replaces generic Utf8 with basic String type information.
-	 */
-	private static TypeInformation<?> convertType(TypeInformation<?> extracted, Schema schema) {
-		if (schema.getType() == Schema.Type.RECORD) {
-			final List<Schema.Field> fields = schema.getFields();
-			final AvroTypeInfo<?> avroTypeInfo = (AvroTypeInfo<?>) extracted;
-
-			final TypeInformation<?>[] types = new TypeInformation<?>[fields.size()];
-			final String[] names = new String[fields.size()];
-			for (int i = 0; i < fields.size(); i++) {
-				final Schema.Field field = fields.get(i);
-				types[i] = convertType(avroTypeInfo.getTypeAt(field.name()), field.schema());
-				names[i] = field.name();
-			}
-			return new RowTypeInfo(types, names);
-		} else if (extracted instanceof GenericTypeInfo<?>) {
-			final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted;
-			if (genericTypeInfo.getTypeClass() == Utf8.class) {
-				return BasicTypeInfo.STRING_TYPE_INFO;
-			}
-		}
-		return extracted;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
new file mode 100644
index 0000000..6e49df2
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.AvroRowDeserializationSchema;
+import org.apache.flink.formats.avro.AvroRowSerializationSchema;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaParseException;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+
+import java.util.List;
+
+/**
+ * Converts an Avro schema into Flink's type information. It uses {@link RowTypeInfo} for representing
+ * objects and converts Avro types into types that are compatible with Flink's Table & SQL API.
+ *
+ * <p>Note: Changes in this class need to be kept in sync with the corresponding runtime
+ * classes {@link AvroRowDeserializationSchema} and {@link AvroRowSerializationSchema}.
+ */
+public class AvroSchemaConverter {
+
+	private AvroSchemaConverter() {
+		// private
+	}
+
+	/**
+	 * Converts an Avro class into a nested row structure with deterministic field order and data
+	 * types that are compatible with Flink's Table & SQL API.
+	 *
+	 * @param avroClass Avro specific record that contains schema information
+	 * @return type information matching the schema
+	 */
+	@SuppressWarnings("unchecked")
+	public static <T extends SpecificRecord> TypeInformation<Row> convertToTypeInfo(Class<T> avroClass) {
+		Preconditions.checkNotNull(avroClass, "Avro specific record class must not be null.");
+		// determine schema to retrieve deterministic field order
+		final Schema schema = SpecificData.get().getSchema(avroClass);
+		return (TypeInformation<Row>) convertToTypeInfo(schema);
+	}
+
+	/**
+	 * Converts an Avro schema string into a nested row structure with deterministic field order and data
+	 * types that are compatible with Flink's Table & SQL API.
+	 *
+	 * @param avroSchemaString Avro schema definition string
+	 * @return type information matching the schema
+	 */
+	@SuppressWarnings("unchecked")
+	public static <T> TypeInformation<T> convertToTypeInfo(String avroSchemaString) {
+		Preconditions.checkNotNull(avroSchemaString, "Avro schema must not be null.");
+		final Schema schema;
+		try {
+			schema = new Schema.Parser().parse(avroSchemaString);
+		} catch (SchemaParseException e) {
+			throw new IllegalArgumentException("Could not parse Avro schema string.", e);
+		}
+		return (TypeInformation<T>) convertToTypeInfo(schema);
+	}
+
+	private static TypeInformation<?> convertToTypeInfo(Schema schema) {
+		switch (schema.getType()) {
+			case RECORD:
+				final List<Schema.Field> fields = schema.getFields();
+
+				final TypeInformation<?>[] types = new TypeInformation<?>[fields.size()];
+				final String[] names = new String[fields.size()];
+				for (int i = 0; i < fields.size(); i++) {
+					final Schema.Field field = fields.get(i);
+					types[i] = convertToTypeInfo(field.schema());
+					names[i] = field.name();
+				}
+				return Types.ROW_NAMED(names, types);
+			case ENUM:
+				return Types.STRING;
+			case ARRAY:
+				// result type might either be ObjectArrayTypeInfo or BasicArrayTypeInfo for Strings
+				return Types.OBJECT_ARRAY(convertToTypeInfo(schema.getElementType()));
+			case MAP:
+				return Types.MAP(Types.STRING, convertToTypeInfo(schema.getValueType()));
+			case UNION:
+				final Schema actualSchema;
+				if (schema.getTypes().size() == 2 && schema.getTypes().get(0).getType() == Schema.Type.NULL) {
+					actualSchema = schema.getTypes().get(1);
+				} else if (schema.getTypes().size() == 2 && schema.getTypes().get(1).getType() == Schema.Type.NULL) {
+					actualSchema = schema.getTypes().get(0);
+				} else if (schema.getTypes().size() == 1) {
+					actualSchema = schema.getTypes().get(0);
+				} else {
+					// use Kryo for serialization
+					return Types.GENERIC(Object.class);
+				}
+				return convertToTypeInfo(actualSchema);
+			case FIXED:
+				// logical decimal type
+				if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
+					return Types.BIG_DEC;
+				}
+				// convert fixed size binary data to primitive byte arrays
+				return Types.PRIMITIVE_ARRAY(Types.BYTE);
+			case STRING:
+				// convert Avro's Utf8/CharSequence to String
+				return Types.STRING;
+			case BYTES:
+				// logical decimal type
+				if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
+					return Types.BIG_DEC;
+				}
+				return Types.PRIMITIVE_ARRAY(Types.BYTE);
+			case INT:
+				// logical date and time type
+				final LogicalType logicalType = schema.getLogicalType();
+				if (logicalType == LogicalTypes.date()) {
+					return Types.SQL_DATE;
+				} else if (logicalType == LogicalTypes.timeMillis()) {
+					return Types.SQL_TIME;
+				}
+				return Types.INT;
+			case LONG:
+				// logical timestamp type
+				if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
+					return Types.SQL_TIMESTAMP;
+				}
+				return Types.LONG;
+			case FLOAT:
+				return Types.FLOAT;
+			case DOUBLE:
+				return Types.DOUBLE;
+			case BOOLEAN:
+				return Types.BOOLEAN;
+			case NULL:
+				return Types.VOID;
+		}
+		throw new IllegalArgumentException("Unsupported Avro type '" + schema.getType() + "'.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
index 5744abc..b871dbc 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
@@ -33,6 +33,11 @@ import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
+import org.joda.time.Chronology;
+import org.joda.time.DateTimeZone;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
+import org.joda.time.chrono.ISOChronology;
 
 import java.io.Serializable;
 import java.util.LinkedHashMap;
@@ -103,4 +108,69 @@ public class AvroKryoSerializerUtils extends AvroUtils {
 			return sParser.parse(schemaAsString);
 		}
 	}
+
+	/**
+	 * Avro logical types use JodaTime's LocalDate but Kryo is unable to serialize it
+	 * properly (esp. visible after calling the toString() method).
+	 */
+	public static class JodaLocalDateSerializer extends Serializer<LocalDate> {
+
+		public JodaLocalDateSerializer() {
+			setImmutable(true);
+		}
+
+		@Override
+		public void write(Kryo kryo, Output output, LocalDate localDate) {
+			output.writeInt(localDate.getYear());
+			output.writeInt(localDate.getMonthOfYear());
+			output.writeInt(localDate.getDayOfMonth());
+
+			final Chronology chronology = localDate.getChronology();
+			if (chronology != null && chronology != ISOChronology.getInstanceUTC()) {
+				throw new RuntimeException("Unsupported chronology: " + chronology);
+			}
+		}
+
+		@Override
+		public LocalDate read(Kryo kryo, Input input, Class<LocalDate> aClass) {
+			final int y = input.readInt();
+			final int m = input.readInt();
+			final int d = input.readInt();
+
+			return new LocalDate(
+				y,
+				m,
+				d,
+				null);
+		}
+	}
+
+	/**
+	 * Avro logical types use JodaTime's LocalTime but Kryo is unable to serialize it
+	 * properly (esp. visible after calling the toString() method).
+	 */
+	public static class JodaLocalTimeSerializer extends Serializer<LocalTime> {
+
+		@Override
+		public void write(Kryo kryo, Output output, LocalTime object) {
+			final int time = object.getMillisOfDay();
+			output.writeInt(time, true);
+
+			final Chronology chronology = object.getChronology();
+			if (chronology != null && chronology != ISOChronology.getInstanceUTC()) {
+				throw new RuntimeException("Unsupported chronology: " + chronology);
+			}
+		}
+
+		@Override
+		public LocalTime read(Kryo kryo, Input input, Class<LocalTime> type) {
+			final int time = input.readInt(true);
+			return new LocalTime(time, ISOChronology.getInstanceUTC().withZone(DateTimeZone.UTC));
+		}
+
+		@Override
+		public LocalTime copy(Kryo kryo, LocalTime original) {
+			return new LocalTime(original);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/main/java/org/apache/flink/table/descriptors/Avro.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/table/descriptors/Avro.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/table/descriptors/Avro.java
index f07a22f..611d714 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/table/descriptors/Avro.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/table/descriptors/Avro.java
@@ -20,14 +20,15 @@ package org.apache.flink.table.descriptors;
 
 import org.apache.flink.util.Preconditions;
 
-import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.specific.SpecificRecord;
 
 /**
  * Format descriptor for Apache Avro records.
  */
 public class Avro extends FormatDescriptor {
 
-	private Class<? extends SpecificRecordBase> recordClass;
+	private Class<? extends SpecificRecord> recordClass;
+	private String avroSchema;
 
 	/**
 	 * Format descriptor for Apache Avro records.
@@ -37,17 +38,28 @@ public class Avro extends FormatDescriptor {
 	}
 
 	/**
-	 * Sets the class of the Avro specific record. Required.
+	 * Sets the class of the Avro specific record.
 	 *
 	 * @param recordClass class of the Avro record.
 	 */
-	public Avro recordClass(Class<? extends SpecificRecordBase> recordClass) {
+	public Avro recordClass(Class<? extends SpecificRecord> recordClass) {
 		Preconditions.checkNotNull(recordClass);
 		this.recordClass = recordClass;
 		return this;
 	}
 
 	/**
+	 * Sets the Avro schema for specific or generic Avro records.
+	 *
+	 * @param avroSchema Avro schema string
+	 */
+	public Avro avroSchema(String avroSchema) {
+		Preconditions.checkNotNull(avroSchema);
+		this.avroSchema = avroSchema;
+		return this;
+	}
+
+	/**
 	 * Internal method for format properties conversion.
 	 */
 	@Override
@@ -55,5 +67,8 @@ public class Avro extends FormatDescriptor {
 		if (null != recordClass) {
 			properties.putClass(AvroValidator.FORMAT_RECORD_CLASS, recordClass);
 		}
+		if (null != avroSchema) {
+			properties.putString(AvroValidator.FORMAT_AVRO_SCHEMA, avroSchema);
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/main/java/org/apache/flink/table/descriptors/AvroValidator.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/table/descriptors/AvroValidator.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/table/descriptors/AvroValidator.java
index 8a72abf..c66dcc7 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/table/descriptors/AvroValidator.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/table/descriptors/AvroValidator.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.descriptors;
 
+import org.apache.flink.table.api.ValidationException;
+
 /**
  * Validator for {@link Avro}.
  */
@@ -25,10 +27,21 @@ public class AvroValidator extends FormatDescriptorValidator {
 
 	public static final String FORMAT_TYPE_VALUE = "avro";
 	public static final String FORMAT_RECORD_CLASS = "format.record-class";
+	public static final String FORMAT_AVRO_SCHEMA = "format.avro-schema";
 
 	@Override
 	public void validate(DescriptorProperties properties) {
 		super.validate(properties);
-		properties.validateString(FORMAT_RECORD_CLASS, false, 1);
+		final boolean hasRecordClass = properties.containsKey(FORMAT_RECORD_CLASS);
+		final boolean hasAvroSchema = properties.containsKey(FORMAT_AVRO_SCHEMA);
+		if (hasRecordClass && hasAvroSchema) {
+			throw new ValidationException("A definition of both a schema and Avro schema is not allowed.");
+		} else if (hasRecordClass) {
+			properties.validateString(FORMAT_RECORD_CLASS, false, 1);
+		} else if (hasAvroSchema) {
+			properties.validateString(FORMAT_AVRO_SCHEMA, false, 1);
+		} else {
+			throw new ValidationException("A definition of an Avro specific record class or Avro schema is required.");
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
index caa6e0d..dd901d0 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.formats.avro.AvroOutputFormat.Codec;
 import org.apache.flink.formats.avro.generated.Colors;
+import org.apache.flink.formats.avro.generated.Fixed2;
 import org.apache.flink.formats.avro.generated.User;
 import org.apache.flink.test.util.JavaProgramTestBase;
 
@@ -31,12 +32,18 @@ import org.apache.avro.file.DataFileReader;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.specific.SpecificDatumReader;
+import org.joda.time.DateTime;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 import org.junit.Assert;
 
 import java.io.File;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 
 /**
  * IT cases for the {@link AvroOutputFormat}.
@@ -72,14 +79,14 @@ public class AvroOutputFormatITCase extends JavaProgramTestBase {
 
 		//output the data with AvroOutputFormat for specific user type
 		DataSet<User> specificUser = input.map(new ConvertToUser());
-		AvroOutputFormat<User> avroOutputFormat = new AvroOutputFormat<User>(User.class);
+		AvroOutputFormat<User> avroOutputFormat = new AvroOutputFormat<>(User.class);
 		avroOutputFormat.setCodec(Codec.SNAPPY); // FLINK-4771: use a codec
 		avroOutputFormat.setSchema(User.SCHEMA$); //FLINK-3304: Ensure the OF is properly serializing the schema
 		specificUser.write(avroOutputFormat, outputPath1);
 
 		//output the data with AvroOutputFormat for reflect user type
 		DataSet<ReflectiveUser> reflectiveUser = specificUser.map(new ConvertToReflective());
-		reflectiveUser.write(new AvroOutputFormat<ReflectiveUser>(ReflectiveUser.class), outputPath2);
+		reflectiveUser.write(new AvroOutputFormat<>(ReflectiveUser.class), outputPath2);
 
 		env.execute();
 	}
@@ -92,17 +99,17 @@ public class AvroOutputFormatITCase extends JavaProgramTestBase {
 		if (file1.isDirectory()) {
 			output1 = file1.listFiles();
 			// check for avro ext in dir.
-			for (File avroOutput : output1) {
+			for (File avroOutput : Objects.requireNonNull(output1)) {
 				Assert.assertTrue("Expect extension '.avro'", avroOutput.toString().endsWith(".avro"));
 			}
 		} else {
 			output1 = new File[] {file1};
 		}
-		List<String> result1 = new ArrayList<String>();
-		DatumReader<User> userDatumReader1 = new SpecificDatumReader<User>(User.class);
+		List<String> result1 = new ArrayList<>();
+		DatumReader<User> userDatumReader1 = new SpecificDatumReader<>(User.class);
 		for (File avroOutput : output1) {
 
-			DataFileReader<User> dataFileReader1 = new DataFileReader<User>(avroOutput, userDatumReader1);
+			DataFileReader<User> dataFileReader1 = new DataFileReader<>(avroOutput, userDatumReader1);
 			while (dataFileReader1.hasNext()) {
 				User user = dataFileReader1.next();
 				result1.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor());
@@ -120,10 +127,10 @@ public class AvroOutputFormatITCase extends JavaProgramTestBase {
 		} else {
 			output2 = new File[] {file2};
 		}
-		List<String> result2 = new ArrayList<String>();
-		DatumReader<ReflectiveUser> userDatumReader2 = new ReflectDatumReader<ReflectiveUser>(ReflectiveUser.class);
-		for (File avroOutput : output2) {
-			DataFileReader<ReflectiveUser> dataFileReader2 = new DataFileReader<ReflectiveUser>(avroOutput, userDatumReader2);
+		List<String> result2 = new ArrayList<>();
+		DatumReader<ReflectiveUser> userDatumReader2 = new ReflectDatumReader<>(ReflectiveUser.class);
+		for (File avroOutput : Objects.requireNonNull(output2)) {
+			DataFileReader<ReflectiveUser> dataFileReader2 = new DataFileReader<>(avroOutput, userDatumReader2);
 			while (dataFileReader2.hasNext()) {
 				ReflectiveUser user = dataFileReader2.next();
 				result2.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor());
@@ -138,7 +145,7 @@ public class AvroOutputFormatITCase extends JavaProgramTestBase {
 	private static final class ConvertToUser extends RichMapFunction<Tuple3<String, Integer, String>, User> {
 
 		@Override
-		public User map(Tuple3<String, Integer, String> value) throws Exception {
+		public User map(Tuple3<String, Integer, String> value) {
 			User user = new User();
 			user.setName(value.f0);
 			user.setFavoriteNumber(value.f1);
@@ -148,6 +155,16 @@ public class AvroOutputFormatITCase extends JavaProgramTestBase {
 			user.setTypeArrayBoolean(Collections.emptyList());
 			user.setTypeEnum(Colors.BLUE);
 			user.setTypeMap(Collections.emptyMap());
+			user.setTypeBytes(ByteBuffer.allocate(10));
+			user.setTypeDate(LocalDate.parse("2014-03-01"));
+			user.setTypeTimeMillis(LocalTime.parse("12:12:12"));
+			user.setTypeTimeMicros(123456);
+			user.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
+			user.setTypeTimestampMicros(123456L);
+			// 20.00
+			user.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
+			// 20.00
+			user.setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
 			return user;
 		}
 	}
@@ -155,7 +172,7 @@ public class AvroOutputFormatITCase extends JavaProgramTestBase {
 	private static final class ConvertToReflective extends RichMapFunction<User, ReflectiveUser> {
 
 		@Override
-		public ReflectiveUser map(User value) throws Exception {
+		public ReflectiveUser map(User value) {
 			return new ReflectiveUser(value.getName().toString(), value.getFavoriteNumber(), value.getFavoriteColor().toString());
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
index b5ad564..3397b8e 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.formats.avro.generated.Colors;
+import org.apache.flink.formats.avro.generated.Fixed2;
 import org.apache.flink.formats.avro.generated.User;
 
 import org.apache.avro.Schema;
@@ -29,6 +30,9 @@ import org.apache.avro.file.DataFileReader;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
+import org.joda.time.DateTime;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 import org.junit.Test;
 import org.mockito.internal.util.reflection.Whitebox;
 
@@ -38,6 +42,8 @@ import java.io.File;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.util.Collections;
 
 import static org.junit.Assert.assertEquals;
@@ -50,7 +56,7 @@ import static org.junit.Assert.fail;
 public class AvroOutputFormatTest {
 
 	@Test
-	public void testSetCodec() throws Exception {
+	public void testSetCodec() {
 		// given
 		final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(User.class);
 
@@ -64,7 +70,7 @@ public class AvroOutputFormatTest {
 	}
 
 	@Test
-	public void testSetCodecError() throws Exception {
+	public void testSetCodecError() {
 		// given
 		boolean error = false;
 		final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(User.class);
@@ -111,6 +117,7 @@ public class AvroOutputFormatTest {
 			// then
 			Object o = ois.readObject();
 			assertTrue(o instanceof AvroOutputFormat);
+			@SuppressWarnings("unchecked")
 			final AvroOutputFormat<User> restored = (AvroOutputFormat<User>) o;
 			final AvroOutputFormat.Codec restoredCodec = (AvroOutputFormat.Codec) Whitebox.getInternalState(restored, "codec");
 			final Schema restoredSchema = (Schema) Whitebox.getInternalState(restored, "userDefinedSchema");
@@ -162,6 +169,17 @@ public class AvroOutputFormatTest {
 			user.setTypeArrayBoolean(Collections.emptyList());
 			user.setTypeEnum(Colors.BLUE);
 			user.setTypeMap(Collections.emptyMap());
+			user.setTypeBytes(ByteBuffer.allocate(10));
+			user.setTypeDate(LocalDate.parse("2014-03-01"));
+			user.setTypeTimeMillis(LocalTime.parse("12:12:12"));
+			user.setTypeTimeMicros(123456);
+			user.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
+			user.setTypeTimestampMicros(123456L);
+			// 20.00
+			user.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
+			// 20.00
+			user.setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
+
 			outputFormat.writeRecord(user);
 		}
 		outputFormat.close();
@@ -189,7 +207,6 @@ public class AvroOutputFormatTest {
 		//cleanup
 		FileSystem fs = FileSystem.getLocalFileSystem();
 		fs.delete(outputPath, false);
-
 	}
 
 	private void output(final AvroOutputFormat<GenericRecord> outputFormat, Schema schema) throws IOException {


[2/3] flink git commit: [FLINK-9444] [formats] Add full SQL support for Avro formats

Posted by tw...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
index 92d2c31..84849a6 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.formats.avro.generated.Address;
 import org.apache.flink.formats.avro.generated.Colors;
+import org.apache.flink.formats.avro.generated.Fixed2;
 import org.apache.flink.formats.avro.generated.User;
 import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
 import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils;
@@ -47,6 +48,9 @@ import org.apache.avro.io.DatumWriter;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.util.Utf8;
+import org.joda.time.DateTime;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -55,6 +59,8 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -100,15 +106,15 @@ public class AvroRecordInputFormatTest {
 	private Schema userSchema = new User().getSchema();
 
 	public static void writeTestFile(File testFile) throws IOException {
-		ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>();
+		ArrayList<CharSequence> stringArray = new ArrayList<>();
 		stringArray.add(TEST_ARRAY_STRING_1);
 		stringArray.add(TEST_ARRAY_STRING_2);
 
-		ArrayList<Boolean> booleanArray = new ArrayList<Boolean>();
+		ArrayList<Boolean> booleanArray = new ArrayList<>();
 		booleanArray.add(TEST_ARRAY_BOOLEAN_1);
 		booleanArray.add(TEST_ARRAY_BOOLEAN_2);
 
-		HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>();
+		HashMap<CharSequence, Long> longMap = new HashMap<>();
 		longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1);
 		longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2);
 
@@ -130,6 +136,16 @@ public class AvroRecordInputFormatTest {
 		user1.setTypeEnum(TEST_ENUM_COLOR);
 		user1.setTypeMap(longMap);
 		user1.setTypeNested(addr);
+		user1.setTypeBytes(ByteBuffer.allocate(10));
+		user1.setTypeDate(LocalDate.parse("2014-03-01"));
+		user1.setTypeTimeMillis(LocalTime.parse("12:12:12"));
+		user1.setTypeTimeMicros(123456);
+		user1.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
+		user1.setTypeTimestampMicros(123456L);
+		// 20.00
+		user1.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
+		// 20.00
+		user1.setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
 
 		// Construct via builder
 		User user2 = User.newBuilder()
@@ -140,20 +156,30 @@ public class AvroRecordInputFormatTest {
 				.setTypeDoubleTest(1.337d)
 				.setTypeNullTest(null)
 				.setTypeLongTest(1337L)
-				.setTypeArrayString(new ArrayList<CharSequence>())
-				.setTypeArrayBoolean(new ArrayList<Boolean>())
+				.setTypeArrayString(new ArrayList<>())
+				.setTypeArrayBoolean(new ArrayList<>())
 				.setTypeNullableArray(null)
 				.setTypeEnum(Colors.RED)
-				.setTypeMap(new HashMap<CharSequence, Long>())
+				.setTypeMap(new HashMap<>())
 				.setTypeFixed(null)
 				.setTypeUnion(null)
 				.setTypeNested(
 						Address.newBuilder().setNum(TEST_NUM).setStreet(TEST_STREET)
 								.setCity(TEST_CITY).setState(TEST_STATE).setZip(TEST_ZIP)
 								.build())
+				.setTypeBytes(ByteBuffer.allocate(10))
+				.setTypeDate(LocalDate.parse("2014-03-01"))
+				.setTypeTimeMillis(LocalTime.parse("12:12:12"))
+				.setTypeTimeMicros(123456)
+				.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"))
+				.setTypeTimestampMicros(123456L)
+				// 20.00
+				.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
+				// 20.00
+				.setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
 				.build();
-		DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
-		DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
+		DatumWriter<User> userDatumWriter = new SpecificDatumWriter<>(User.class);
+		DataFileWriter<User> dataFileWriter = new DataFileWriter<>(userDatumWriter);
 		dataFileWriter.create(user1.getSchema(), testFile);
 		dataFileWriter.append(user1);
 		dataFileWriter.append(user2);
@@ -167,14 +193,13 @@ public class AvroRecordInputFormatTest {
 	}
 
 	/**
-	 * Test if the AvroInputFormat is able to properly read data from an avro file.
-	 * @throws IOException
+	 * Test if the AvroInputFormat is able to properly read data from an Avro file.
 	 */
 	@Test
-	public void testDeserialisation() throws IOException {
+	public void testDeserialization() throws IOException {
 		Configuration parameters = new Configuration();
 
-		AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
+		AvroInputFormat<User> format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class);
 
 		format.configure(parameters);
 		FileInputSplit[] splits = format.createInputSplits(1);
@@ -216,14 +241,13 @@ public class AvroRecordInputFormatTest {
 	}
 
 	/**
-	 * Test if the AvroInputFormat is able to properly read data from an avro file.
-	 * @throws IOException
+	 * Test if the AvroInputFormat is able to properly read data from an Avro file.
 	 */
 	@Test
-	public void testDeserialisationReuseAvroRecordFalse() throws IOException {
+	public void testDeserializationReuseAvroRecordFalse() throws IOException {
 		Configuration parameters = new Configuration();
 
-		AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
+		AvroInputFormat<User> format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class);
 		format.setReuseAvroValue(false);
 
 		format.configure(parameters);
@@ -294,7 +318,7 @@ public class AvroRecordInputFormatTest {
 			ExecutionConfig ec = new ExecutionConfig();
 			assertEquals(GenericTypeInfo.class, te.getClass());
 
-			Serializers.recursivelyRegisterType(te.getTypeClass(), ec, new HashSet<Class<?>>());
+			Serializers.recursivelyRegisterType(te.getTypeClass(), ec, new HashSet<>());
 
 			TypeSerializer<GenericData.Record> tser = te.createSerializer(ec);
 			assertEquals(1, ec.getDefaultKryoSerializerClasses().size());
@@ -327,7 +351,7 @@ public class AvroRecordInputFormatTest {
 	@Test
 	public void testDeserializeToSpecificType() throws IOException {
 
-		DatumReader<User> datumReader = new SpecificDatumReader<User>(userSchema);
+		DatumReader<User> datumReader = new SpecificDatumReader<>(userSchema);
 
 		try (FileReader<User> dataFileReader = DataFileReader.openReader(testFile, datumReader)) {
 			User rec = dataFileReader.next();
@@ -365,15 +389,12 @@ public class AvroRecordInputFormatTest {
 	/**
 	 * Test if the AvroInputFormat is able to properly read data from an Avro
 	 * file as a GenericRecord.
-	 *
-	 * @throws IOException
 	 */
 	@Test
-	public void testDeserialisationGenericRecord() throws IOException {
+	public void testDeserializationGenericRecord() throws IOException {
 		Configuration parameters = new Configuration();
 
-		AvroInputFormat<GenericRecord> format = new AvroInputFormat<GenericRecord>(new Path(testFile.getAbsolutePath()),
-				GenericRecord.class);
+		AvroInputFormat<GenericRecord> format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), GenericRecord.class);
 
 		doTestDeserializationGenericRecord(format, parameters);
 	}
@@ -440,17 +461,17 @@ public class AvroRecordInputFormatTest {
 	 * @throws IOException if there is an error
 	 */
 	@Test
-	public void testDeserialisationGenericRecordReuseAvroValueFalse() throws IOException {
+	public void testDeserializationGenericRecordReuseAvroValueFalse() throws IOException {
 		Configuration parameters = new Configuration();
 
-		AvroInputFormat<GenericRecord> format = new AvroInputFormat<GenericRecord>(new Path(testFile.getAbsolutePath()),
-				GenericRecord.class);
+		AvroInputFormat<GenericRecord> format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), GenericRecord.class);
 		format.configure(parameters);
 		format.setReuseAvroValue(false);
 
 		doTestDeserializationGenericRecord(format, parameters);
 	}
 
+	@SuppressWarnings("ResultOfMethodCallIgnored")
 	@After
 	public void deleteFiles() {
 		testFile.delete();

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java
index 1d98c14..de50f27 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java
@@ -23,8 +23,9 @@ import org.apache.flink.formats.avro.utils.AvroTestUtils;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.InstantiationUtil;
 
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -37,8 +38,8 @@ import static org.junit.Assert.assertEquals;
 public class AvroRowDeSerializationSchemaTest {
 
 	@Test
-	public void testSerializeDeserializeSimpleRow() throws IOException {
-		final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> testData = AvroTestUtils.getSimpleTestData();
+	public void testSpecificSerializeDeserializeFromClass() throws IOException {
+		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSpecificTestData();
 
 		final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
 		final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
@@ -50,14 +51,13 @@ public class AvroRowDeSerializationSchemaTest {
 	}
 
 	@Test
-	public void testSerializeSimpleRowSeveralTimes() throws IOException {
-		final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> testData = AvroTestUtils.getSimpleTestData();
+	public void testSpecificSerializeDeserializeFromSchema() throws IOException {
+		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSpecificTestData();
+		final String schemaString = testData.f1.getSchema().toString();
 
-		final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
-		final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
+		final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(schemaString);
+		final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(schemaString);
 
-		serializationSchema.serialize(testData.f2);
-		serializationSchema.serialize(testData.f2);
 		final byte[] bytes = serializationSchema.serialize(testData.f2);
 		final Row actual = deserializationSchema.deserialize(bytes);
 
@@ -65,27 +65,27 @@ public class AvroRowDeSerializationSchemaTest {
 	}
 
 	@Test
-	public void testDeserializeRowSeveralTimes() throws IOException {
-		final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> testData = AvroTestUtils.getSimpleTestData();
+	public void testGenericSerializeDeserialize() throws IOException {
+		final Tuple3<GenericRecord, Row, Schema> testData = AvroTestUtils.getGenericTestData();
 
-		final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
-		final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
+		final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f2.toString());
+		final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f2.toString());
 
-		final byte[] bytes = serializationSchema.serialize(testData.f2);
-		deserializationSchema.deserialize(bytes);
-		deserializationSchema.deserialize(bytes);
+		final byte[] bytes = serializationSchema.serialize(testData.f1);
 		final Row actual = deserializationSchema.deserialize(bytes);
 
-		assertEquals(testData.f2, actual);
+		assertEquals(testData.f1, actual);
 	}
 
 	@Test
-	public void testSerializeDeserializeComplexRow() throws IOException {
-		final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
+	public void testSpecificSerializeFromClassSeveralTimes() throws IOException {
+		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSpecificTestData();
 
 		final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
 		final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
 
+		serializationSchema.serialize(testData.f2);
+		serializationSchema.serialize(testData.f2);
 		final byte[] bytes = serializationSchema.serialize(testData.f2);
 		final Row actual = deserializationSchema.deserialize(bytes);
 
@@ -93,11 +93,12 @@ public class AvroRowDeSerializationSchemaTest {
 	}
 
 	@Test
-	public void testSerializeComplexRowSeveralTimes() throws IOException {
-		final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
+	public void testSpecificSerializeFromSchemaSeveralTimes() throws IOException {
+		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSpecificTestData();
+		final String schemaString = testData.f1.getSchema().toString();
 
-		final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
-		final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
+		final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(schemaString);
+		final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(schemaString);
 
 		serializationSchema.serialize(testData.f2);
 		serializationSchema.serialize(testData.f2);
@@ -108,8 +109,23 @@ public class AvroRowDeSerializationSchemaTest {
 	}
 
 	@Test
-	public void testDeserializeComplexRowSeveralTimes() throws IOException {
-		final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
+	public void testGenericSerializeSeveralTimes() throws IOException {
+		final Tuple3<GenericRecord, Row, Schema> testData = AvroTestUtils.getGenericTestData();
+
+		final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f2.toString());
+		final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f2.toString());
+
+		serializationSchema.serialize(testData.f1);
+		serializationSchema.serialize(testData.f1);
+		final byte[] bytes = serializationSchema.serialize(testData.f1);
+		final Row actual = deserializationSchema.deserialize(bytes);
+
+		assertEquals(testData.f1, actual);
+	}
+
+	@Test
+	public void testSpecificDeserializeFromClassSeveralTimes() throws IOException {
+		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSpecificTestData();
 
 		final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
 		final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
@@ -123,25 +139,66 @@ public class AvroRowDeSerializationSchemaTest {
 	}
 
 	@Test
-	public void testSerializability() throws IOException, ClassNotFoundException {
-		final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
+	public void testSpecificDeserializeFromSchemaSeveralTimes() throws IOException {
+		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSpecificTestData();
+		final String schemaString = testData.f1.getSchema().toString();
 
-		final AvroRowSerializationSchema serOrig = new AvroRowSerializationSchema(testData.f0);
-		final AvroRowDeserializationSchema deserOrig = new AvroRowDeserializationSchema(testData.f0);
+		final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(schemaString);
+		final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(schemaString);
 
-		byte[] serBytes = InstantiationUtil.serializeObject(serOrig);
-		byte[] deserBytes = InstantiationUtil.serializeObject(deserOrig);
+		final byte[] bytes = serializationSchema.serialize(testData.f2);
+		deserializationSchema.deserialize(bytes);
+		deserializationSchema.deserialize(bytes);
+		final Row actual = deserializationSchema.deserialize(bytes);
 
-		AvroRowSerializationSchema serCopy =
+		assertEquals(testData.f2, actual);
+	}
+
+	@Test
+	public void testGenericDeserializeSeveralTimes() throws IOException {
+		final Tuple3<GenericRecord, Row, Schema> testData = AvroTestUtils.getGenericTestData();
+
+		final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f2.toString());
+		final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f2.toString());
+
+		final byte[] bytes = serializationSchema.serialize(testData.f1);
+		deserializationSchema.deserialize(bytes);
+		deserializationSchema.deserialize(bytes);
+		final Row actual = deserializationSchema.deserialize(bytes);
+
+		assertEquals(testData.f1, actual);
+	}
+
+	@Test
+	public void testSerializability() throws Exception {
+		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSpecificTestData();
+		final String schemaString = testData.f1.getSchema().toString();
+
+		// from class
+		final AvroRowSerializationSchema classSer = new AvroRowSerializationSchema(testData.f0);
+		final AvroRowDeserializationSchema classDeser = new AvroRowDeserializationSchema(testData.f0);
+		testSerializability(classSer, classDeser, testData.f2);
+
+		// from schema string
+		final AvroRowSerializationSchema schemaSer = new AvroRowSerializationSchema(schemaString);
+		final AvroRowDeserializationSchema schemaDeser = new AvroRowDeserializationSchema(schemaString);
+		testSerializability(schemaSer, schemaDeser, testData.f2);
+	}
+
+	private void testSerializability(AvroRowSerializationSchema ser, AvroRowDeserializationSchema deser, Row data) throws Exception {
+		final byte[] serBytes = InstantiationUtil.serializeObject(ser);
+		final byte[] deserBytes = InstantiationUtil.serializeObject(deser);
+
+		final AvroRowSerializationSchema serCopy =
 			InstantiationUtil.deserializeObject(serBytes, Thread.currentThread().getContextClassLoader());
-		AvroRowDeserializationSchema deserCopy =
+		final AvroRowDeserializationSchema deserCopy =
 			InstantiationUtil.deserializeObject(deserBytes, Thread.currentThread().getContextClassLoader());
 
-		final byte[] bytes = serCopy.serialize(testData.f2);
+		final byte[] bytes = serCopy.serialize(data);
 		deserCopy.deserialize(bytes);
 		deserCopy.deserialize(bytes);
 		final Row actual = deserCopy.deserialize(bytes);
 
-		assertEquals(testData.f2, actual);
+		assertEquals(data, actual);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
index 40a84f9..fee81a8 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
@@ -25,11 +25,15 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.formats.avro.generated.Address;
 import org.apache.flink.formats.avro.generated.Colors;
 import org.apache.flink.formats.avro.generated.Fixed16;
+import org.apache.flink.formats.avro.generated.Fixed2;
 import org.apache.flink.formats.avro.generated.User;
 
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.specific.SpecificDatumWriter;
+import org.joda.time.DateTime;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -37,6 +41,8 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Random;
@@ -67,7 +73,7 @@ public class AvroSplittableInputFormatTest {
 	static final String TEST_MAP_KEY2 = "KEY 2";
 	static final long TEST_MAP_VALUE2 = 17554L;
 
-	static final Integer TEST_NUM = new Integer(239);
+	static final Integer TEST_NUM = 239;
 	static final String TEST_STREET = "Baker Street";
 	static final String TEST_CITY = "London";
 	static final String TEST_STATE = "London";
@@ -79,20 +85,20 @@ public class AvroSplittableInputFormatTest {
 	public void createFiles() throws IOException {
 		testFile = File.createTempFile("AvroSplittableInputFormatTest", null);
 
-		ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>();
+		ArrayList<CharSequence> stringArray = new ArrayList<>();
 		stringArray.add(TEST_ARRAY_STRING_1);
 		stringArray.add(TEST_ARRAY_STRING_2);
 
-		ArrayList<Boolean> booleanArray = new ArrayList<Boolean>();
+		ArrayList<Boolean> booleanArray = new ArrayList<>();
 		booleanArray.add(TEST_ARRAY_BOOLEAN_1);
 		booleanArray.add(TEST_ARRAY_BOOLEAN_2);
 
-		HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>();
+		HashMap<CharSequence, Long> longMap = new HashMap<>();
 		longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1);
 		longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2);
 
 		Address addr = new Address();
-		addr.setNum(new Integer(TEST_NUM));
+		addr.setNum(TEST_NUM);
 		addr.setStreet(TEST_STREET);
 		addr.setCity(TEST_CITY);
 		addr.setState(TEST_STATE);
@@ -108,6 +114,16 @@ public class AvroSplittableInputFormatTest {
 		user1.setTypeEnum(TEST_ENUM_COLOR);
 		user1.setTypeMap(longMap);
 		user1.setTypeNested(addr);
+		user1.setTypeBytes(ByteBuffer.allocate(10));
+		user1.setTypeDate(LocalDate.parse("2014-03-01"));
+		user1.setTypeTimeMillis(LocalTime.parse("12:12:12"));
+		user1.setTypeTimeMicros(123456);
+		user1.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
+		user1.setTypeTimestampMicros(123456L);
+		// 20.00
+		user1.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
+		// 20.00
+		user1.setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
 
 		// Construct via builder
 		User user2 = User.newBuilder()
@@ -118,20 +134,30 @@ public class AvroSplittableInputFormatTest {
 				.setTypeDoubleTest(1.337d)
 				.setTypeNullTest(null)
 				.setTypeLongTest(1337L)
-				.setTypeArrayString(new ArrayList<CharSequence>())
-				.setTypeArrayBoolean(new ArrayList<Boolean>())
+				.setTypeArrayString(new ArrayList<>())
+				.setTypeArrayBoolean(new ArrayList<>())
 				.setTypeNullableArray(null)
 				.setTypeEnum(Colors.RED)
-				.setTypeMap(new HashMap<CharSequence, Long>())
+				.setTypeMap(new HashMap<>())
 				.setTypeFixed(new Fixed16())
 				.setTypeUnion(123L)
 				.setTypeNested(
 						Address.newBuilder().setNum(TEST_NUM).setStreet(TEST_STREET)
 								.setCity(TEST_CITY).setState(TEST_STATE).setZip(TEST_ZIP)
 								.build())
+				.setTypeBytes(ByteBuffer.allocate(10))
+				.setTypeDate(LocalDate.parse("2014-03-01"))
+				.setTypeTimeMillis(LocalTime.parse("12:12:12"))
+				.setTypeTimeMicros(123456)
+				.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"))
+				.setTypeTimestampMicros(123456L)
+				// 20.00
+				.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
+				// 20.00
+				.setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
 				.build();
-		DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
-		DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
+		DatumWriter<User> userDatumWriter = new SpecificDatumWriter<>(User.class);
+		DataFileWriter<User> dataFileWriter = new DataFileWriter<>(userDatumWriter);
 		dataFileWriter.create(user1.getSchema(), testFile);
 		dataFileWriter.append(user1);
 		dataFileWriter.append(user2);
@@ -148,12 +174,22 @@ public class AvroSplittableInputFormatTest {
 			user.setTypeEnum(TEST_ENUM_COLOR);
 			user.setTypeMap(longMap);
 			Address address = new Address();
-			address.setNum(new Integer(TEST_NUM));
+			address.setNum(TEST_NUM);
 			address.setStreet(TEST_STREET);
 			address.setCity(TEST_CITY);
 			address.setState(TEST_STATE);
 			address.setZip(TEST_ZIP);
 			user.setTypeNested(address);
+			user.setTypeBytes(ByteBuffer.allocate(10));
+			user.setTypeDate(LocalDate.parse("2014-03-01"));
+			user.setTypeTimeMillis(LocalTime.parse("12:12:12"));
+			user.setTypeTimeMicros(123456);
+			user.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
+			user.setTypeTimestampMicros(123456L);
+			// 20.00
+			user.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
+			// 20.00
+			user.setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
 
 			dataFileWriter.append(user);
 		}
@@ -164,7 +200,7 @@ public class AvroSplittableInputFormatTest {
 	public void testSplittedIF() throws IOException {
 		Configuration parameters = new Configuration();
 
-		AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
+		AvroInputFormat<User> format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class);
 
 		format.configure(parameters);
 		FileInputSplit[] splits = format.createInputSplits(4);
@@ -182,10 +218,10 @@ public class AvroSplittableInputFormatTest {
 			format.close();
 		}
 
-		Assert.assertEquals(1539, elementsPerSplit[0]);
-		Assert.assertEquals(1026, elementsPerSplit[1]);
-		Assert.assertEquals(1539, elementsPerSplit[2]);
-		Assert.assertEquals(896, elementsPerSplit[3]);
+		Assert.assertEquals(1604, elementsPerSplit[0]);
+		Assert.assertEquals(1203, elementsPerSplit[1]);
+		Assert.assertEquals(1203, elementsPerSplit[2]);
+		Assert.assertEquals(990, elementsPerSplit[3]);
 		Assert.assertEquals(NUM_RECORDS, elements);
 		format.close();
 	}
@@ -196,7 +232,7 @@ public class AvroSplittableInputFormatTest {
 
 		Configuration parameters = new Configuration();
 
-		AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
+		AvroInputFormat<User> format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class);
 		format.configure(parameters);
 
 		FileInputSplit[] splits = format.createInputSplits(4);
@@ -228,10 +264,10 @@ public class AvroSplittableInputFormatTest {
 			format.close();
 		}
 
-		Assert.assertEquals(1539, elementsPerSplit[0]);
-		Assert.assertEquals(1026, elementsPerSplit[1]);
-		Assert.assertEquals(1539, elementsPerSplit[2]);
-		Assert.assertEquals(896, elementsPerSplit[3]);
+		Assert.assertEquals(1604, elementsPerSplit[0]);
+		Assert.assertEquals(1203, elementsPerSplit[1]);
+		Assert.assertEquals(1203, elementsPerSplit[2]);
+		Assert.assertEquals(990, elementsPerSplit[3]);
 		Assert.assertEquals(NUM_RECORDS, elements);
 		format.close();
 	}
@@ -242,7 +278,7 @@ public class AvroSplittableInputFormatTest {
 
 		Configuration parameters = new Configuration();
 
-		AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
+		AvroInputFormat<User> format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class);
 		format.configure(parameters);
 
 		FileInputSplit[] splits = format.createInputSplits(4);
@@ -274,10 +310,10 @@ public class AvroSplittableInputFormatTest {
 			format.close();
 		}
 
-		Assert.assertEquals(1539, elementsPerSplit[0]);
-		Assert.assertEquals(1026, elementsPerSplit[1]);
-		Assert.assertEquals(1539, elementsPerSplit[2]);
-		Assert.assertEquals(896, elementsPerSplit[3]);
+		Assert.assertEquals(1604, elementsPerSplit[0]);
+		Assert.assertEquals(1203, elementsPerSplit[1]);
+		Assert.assertEquals(1203, elementsPerSplit[2]);
+		Assert.assertEquals(990, elementsPerSplit[3]);
 		Assert.assertEquals(NUM_RECORDS, elements);
 		format.close();
 	}
@@ -287,11 +323,23 @@ public class AvroSplittableInputFormatTest {
 
 	This dependency needs to be added
 
-        <dependency>
-            <groupId>org.apache.avro</groupId>
-            <artifactId>avro-mapred</artifactId>
-            <version>1.7.6</version>
-        </dependency>
+		<dependency>
+			<groupId>org.apache.avro</groupId>
+			<artifactId>avro-mapred</artifactId>
+			<version>1.7.6</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-hadoop-compatibility_2.11</artifactId>
+			<version>1.6-SNAPSHOT</version>
+		</dependency>
+
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>16.0</version>
+		</dependency>
 
 	@Test
 	public void testHadoop() throws Exception {
@@ -314,10 +362,11 @@ public class AvroSplittableInputFormatTest {
 			}
 			i++;
 		}
-		System.out.println("Status "+Arrays.toString(elementsPerSplit));
-	} **/
+		System.out.println("Status " + Arrays.toString(elementsPerSplit));
+	} */
 
 	@After
+	@SuppressWarnings("ResultOfMethodCallIgnored")
 	public void deleteFiles() {
 		testFile.delete();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
index 87e169b..49ef985 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.formats.avro;
 import org.apache.flink.formats.avro.generated.Address;
 import org.apache.flink.formats.avro.generated.Colors;
 import org.apache.flink.formats.avro.generated.Fixed16;
+import org.apache.flink.formats.avro.generated.Fixed2;
 import org.apache.flink.formats.avro.generated.User;
 import org.apache.flink.formats.avro.utils.DataInputDecoder;
 import org.apache.flink.formats.avro.utils.DataOutputEncoder;
@@ -28,12 +29,17 @@ import org.apache.flink.util.StringUtils;
 
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.reflect.ReflectDatumWriter;
+import org.joda.time.DateTime;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -49,6 +55,7 @@ import static org.junit.Assert.fail;
  * Tests the {@link DataOutputEncoder} and {@link DataInputDecoder} classes for Avro serialization.
  */
 public class EncoderDecoderTest {
+
 	@Test
 	public void testComplexStringsDirecty() {
 		try {
@@ -93,56 +100,56 @@ public class EncoderDecoderTest {
 	@Test
 	public void testPrimitiveTypes() {
 
-		testObjectSerialization(new Boolean(true));
-		testObjectSerialization(new Boolean(false));
-
-		testObjectSerialization(Byte.valueOf((byte) 0));
-		testObjectSerialization(Byte.valueOf((byte) 1));
-		testObjectSerialization(Byte.valueOf((byte) -1));
-		testObjectSerialization(Byte.valueOf(Byte.MIN_VALUE));
-		testObjectSerialization(Byte.valueOf(Byte.MAX_VALUE));
-
-		testObjectSerialization(Short.valueOf((short) 0));
-		testObjectSerialization(Short.valueOf((short) 1));
-		testObjectSerialization(Short.valueOf((short) -1));
-		testObjectSerialization(Short.valueOf(Short.MIN_VALUE));
-		testObjectSerialization(Short.valueOf(Short.MAX_VALUE));
-
-		testObjectSerialization(Integer.valueOf(0));
-		testObjectSerialization(Integer.valueOf(1));
-		testObjectSerialization(Integer.valueOf(-1));
-		testObjectSerialization(Integer.valueOf(Integer.MIN_VALUE));
-		testObjectSerialization(Integer.valueOf(Integer.MAX_VALUE));
-
-		testObjectSerialization(Long.valueOf(0));
-		testObjectSerialization(Long.valueOf(1));
-		testObjectSerialization(Long.valueOf(-1));
-		testObjectSerialization(Long.valueOf(Long.MIN_VALUE));
-		testObjectSerialization(Long.valueOf(Long.MAX_VALUE));
-
-		testObjectSerialization(Float.valueOf(0));
-		testObjectSerialization(Float.valueOf(1));
-		testObjectSerialization(Float.valueOf(-1));
-		testObjectSerialization(Float.valueOf((float) Math.E));
-		testObjectSerialization(Float.valueOf((float) Math.PI));
-		testObjectSerialization(Float.valueOf(Float.MIN_VALUE));
-		testObjectSerialization(Float.valueOf(Float.MAX_VALUE));
-		testObjectSerialization(Float.valueOf(Float.MIN_NORMAL));
-		testObjectSerialization(Float.valueOf(Float.NaN));
-		testObjectSerialization(Float.valueOf(Float.NEGATIVE_INFINITY));
-		testObjectSerialization(Float.valueOf(Float.POSITIVE_INFINITY));
-
-		testObjectSerialization(Double.valueOf(0));
-		testObjectSerialization(Double.valueOf(1));
-		testObjectSerialization(Double.valueOf(-1));
-		testObjectSerialization(Double.valueOf(Math.E));
-		testObjectSerialization(Double.valueOf(Math.PI));
-		testObjectSerialization(Double.valueOf(Double.MIN_VALUE));
-		testObjectSerialization(Double.valueOf(Double.MAX_VALUE));
-		testObjectSerialization(Double.valueOf(Double.MIN_NORMAL));
-		testObjectSerialization(Double.valueOf(Double.NaN));
-		testObjectSerialization(Double.valueOf(Double.NEGATIVE_INFINITY));
-		testObjectSerialization(Double.valueOf(Double.POSITIVE_INFINITY));
+		testObjectSerialization(Boolean.TRUE);
+		testObjectSerialization(Boolean.FALSE);
+
+		testObjectSerialization((byte) 0);
+		testObjectSerialization((byte) 1);
+		testObjectSerialization((byte) -1);
+		testObjectSerialization(Byte.MIN_VALUE);
+		testObjectSerialization(Byte.MAX_VALUE);
+
+		testObjectSerialization((short) 0);
+		testObjectSerialization((short) 1);
+		testObjectSerialization((short) -1);
+		testObjectSerialization(Short.MIN_VALUE);
+		testObjectSerialization(Short.MAX_VALUE);
+
+		testObjectSerialization(0);
+		testObjectSerialization(1);
+		testObjectSerialization(-1);
+		testObjectSerialization(Integer.MIN_VALUE);
+		testObjectSerialization(Integer.MAX_VALUE);
+
+		testObjectSerialization(0L);
+		testObjectSerialization(1L);
+		testObjectSerialization((long) -1);
+		testObjectSerialization(Long.MIN_VALUE);
+		testObjectSerialization(Long.MAX_VALUE);
+
+		testObjectSerialization(0f);
+		testObjectSerialization(1f);
+		testObjectSerialization((float) -1);
+		testObjectSerialization((float) Math.E);
+		testObjectSerialization((float) Math.PI);
+		testObjectSerialization(Float.MIN_VALUE);
+		testObjectSerialization(Float.MAX_VALUE);
+		testObjectSerialization(Float.MIN_NORMAL);
+		testObjectSerialization(Float.NaN);
+		testObjectSerialization(Float.NEGATIVE_INFINITY);
+		testObjectSerialization(Float.POSITIVE_INFINITY);
+
+		testObjectSerialization(0d);
+		testObjectSerialization(1d);
+		testObjectSerialization((double) -1);
+		testObjectSerialization(Math.E);
+		testObjectSerialization(Math.PI);
+		testObjectSerialization(Double.MIN_VALUE);
+		testObjectSerialization(Double.MAX_VALUE);
+		testObjectSerialization(Double.MIN_NORMAL);
+		testObjectSerialization(Double.NaN);
+		testObjectSerialization(Double.NEGATIVE_INFINITY);
+		testObjectSerialization(Double.POSITIVE_INFINITY);
 
 		testObjectSerialization("");
 		testObjectSerialization("abcdefg");
@@ -209,7 +216,7 @@ public class EncoderDecoderTest {
 
 		// object with collection
 		{
-			ArrayList<String> list = new ArrayList<String>();
+			ArrayList<String> list = new ArrayList<>();
 			list.add("A");
 			list.add("B");
 			list.add("C");
@@ -221,7 +228,7 @@ public class EncoderDecoderTest {
 
 		// object with empty collection
 		{
-			ArrayList<String> list = new ArrayList<String>();
+			ArrayList<String> list = new ArrayList<>();
 			testObjectSerialization(new BookAuthor(987654321L, list, "The Saurus"));
 		}
 	}
@@ -235,7 +242,7 @@ public class EncoderDecoderTest {
 	public void testGeneratedObjectWithNullableFields() {
 		List<CharSequence> strings = Arrays.asList(new CharSequence[] { "These", "strings", "should", "be", "recognizable", "as", "a", "meaningful", "sequence" });
 		List<Boolean> bools = Arrays.asList(true, true, false, false, true, false, true, true);
-		Map<CharSequence, Long> map = new HashMap<CharSequence, Long>();
+		Map<CharSequence, Long> map = new HashMap<>();
 		map.put("1", 1L);
 		map.put("2", 2L);
 		map.put("3", 3L);
@@ -243,11 +250,31 @@ public class EncoderDecoderTest {
 		byte[] b = new byte[16];
 		new Random().nextBytes(b);
 		Fixed16 f = new Fixed16(b);
-		Address addr = new Address(new Integer(239), "6th Main", "Bangalore",
-				"Karnataka", "560075");
-		User user = new User("Freudenreich", 1337, "macintosh gray",
-				1234567890L, 3.1415926, null, true, strings, bools, null,
-				Colors.GREEN, map, f, new Boolean(true), addr);
+		Address addr = new Address(239, "6th Main", "Bangalore", "Karnataka", "560075");
+		User user = new User(
+			"Freudenreich",
+			1337,
+			"macintosh gray",
+			1234567890L,
+			3.1415926,
+			null,
+			true,
+			strings,
+			bools,
+			null,
+			Colors.GREEN,
+			map,
+			f,
+			Boolean.TRUE,
+			addr,
+			ByteBuffer.wrap(b),
+			LocalDate.parse("2014-03-01"),
+			LocalTime.parse("12:12:12"),
+			123456,
+			DateTime.parse("2014-03-01T12:12:12.321Z"),
+			123456L,
+			ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()), // 20.00
+			new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())); // 20.00
 
 		testObjectSerialization(user);
 	}
@@ -301,7 +328,7 @@ public class EncoderDecoderTest {
 
 				@SuppressWarnings("unchecked")
 				Class<X> clazz = (Class<X>) obj.getClass();
-				ReflectDatumWriter<X> writer = new ReflectDatumWriter<X>(clazz);
+				ReflectDatumWriter<X> writer = new ReflectDatumWriter<>(clazz);
 
 				writer.write(obj, encoder);
 				dataOut.flush();
@@ -309,7 +336,7 @@ public class EncoderDecoderTest {
 			}
 
 			byte[] data = baos.toByteArray();
-			X result = null;
+			X result;
 
 			// deserialize
 			{
@@ -320,7 +347,7 @@ public class EncoderDecoderTest {
 
 				@SuppressWarnings("unchecked")
 				Class<X> clazz = (Class<X>) obj.getClass();
-				ReflectDatumReader<X> reader = new ReflectDatumReader<X>(clazz);
+				ReflectDatumReader<X> reader = new ReflectDatumReader<>(clazz);
 
 				// create a reuse object if possible, otherwise we have no reuse object
 				X reuse = null;
@@ -328,7 +355,9 @@ public class EncoderDecoderTest {
 					@SuppressWarnings("unchecked")
 					X test = (X) obj.getClass().newInstance();
 					reuse = test;
-				} catch (Throwable t) {}
+				} catch (Throwable t) {
+					// do nothing
+				}
 
 				result = reader.read(reuse, decoder);
 			}
@@ -427,7 +456,7 @@ public class EncoderDecoderTest {
 		public ComplexNestedObject1(int offInit) {
 			this.doubleValue = 6293485.6723 + offInit;
 
-			this.stringList = new ArrayList<String>();
+			this.stringList = new ArrayList<>();
 			this.stringList.add("A" + offInit);
 			this.stringList.add("somewhat" + offInit);
 			this.stringList.add("random" + offInit);
@@ -458,7 +487,7 @@ public class EncoderDecoderTest {
 		public ComplexNestedObject2(boolean init) {
 			this.longValue = 46547;
 
-			this.theMap = new HashMap<String, ComplexNestedObject1>();
+			this.theMap = new HashMap<>();
 			this.theMap.put("36354L", new ComplexNestedObject1(43546543));
 			this.theMap.put("785611L", new ComplexNestedObject1(45784568));
 			this.theMap.put("43L", new ComplexNestedObject1(9876543));

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
new file mode 100644
index 0000000..be0ddc4
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link AvroSchemaConverter}.
+ */
+public class AvroSchemaConverterTest {
+
+	@Test
+	public void testAvroClassConversion() {
+		validateUserSchema(AvroSchemaConverter.convertToTypeInfo(User.class));
+	}
+
+	@Test
+	public void testAvroSchemaConversion() {
+		final String schema = User.getClassSchema().toString(true);
+		validateUserSchema(AvroSchemaConverter.convertToTypeInfo(schema));
+	}
+
+	private void validateUserSchema(TypeInformation<?> actual) {
+		final TypeInformation<Row> address = Types.ROW_NAMED(
+			new String[]{
+				"num",
+				"street",
+				"city",
+				"state",
+				"zip"},
+			Types.INT,
+			Types.STRING,
+			Types.STRING,
+			Types.STRING,
+			Types.STRING);
+
+		final TypeInformation<Row> user = Types.ROW_NAMED(
+			new String[] {
+				"name",
+				"favorite_number",
+				"favorite_color",
+				"type_long_test",
+				"type_double_test",
+				"type_null_test",
+				"type_bool_test",
+				"type_array_string",
+				"type_array_boolean",
+				"type_nullable_array",
+				"type_enum",
+				"type_map",
+				"type_fixed",
+				"type_union",
+				"type_nested",
+				"type_bytes",
+				"type_date",
+				"type_time_millis",
+				"type_time_micros",
+				"type_timestamp_millis",
+				"type_timestamp_micros",
+				"type_decimal_bytes",
+				"type_decimal_fixed"},
+			Types.STRING,
+			Types.INT,
+			Types.STRING,
+			Types.LONG,
+			Types.DOUBLE,
+			Types.VOID,
+			Types.BOOLEAN,
+			Types.OBJECT_ARRAY(Types.STRING),
+			Types.OBJECT_ARRAY(Types.BOOLEAN),
+			Types.OBJECT_ARRAY(Types.STRING),
+			Types.STRING,
+			Types.MAP(Types.STRING, Types.LONG),
+			Types.PRIMITIVE_ARRAY(Types.BYTE),
+			Types.GENERIC(Object.class),
+			address,
+			Types.PRIMITIVE_ARRAY(Types.BYTE),
+			Types.SQL_DATE,
+			Types.SQL_TIME,
+			Types.INT,
+			Types.SQL_TIMESTAMP,
+			Types.LONG,
+			Types.BIG_DEC,
+			Types.BIG_DEC);
+
+		assertEquals(user, actual);
+
+		final RowTypeInfo userRowInfo = (RowTypeInfo) user;
+		assertTrue(userRowInfo.schemaEquals(actual));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
index fbabb95..ccba0a5 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.formats.avro.typeutils;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -31,7 +32,6 @@ import org.apache.flink.formats.avro.AvroRecordInputFormatTest;
 import org.apache.flink.formats.avro.generated.Fixed16;
 import org.apache.flink.formats.avro.generated.User;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
 
 import org.junit.After;
 import org.junit.Assert;
@@ -52,6 +52,7 @@ import java.util.Map;
  */
 @RunWith(Parameterized.class)
 public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
+
 	public AvroTypeExtractionTest(TestExecutionMode mode) {
 		super(mode);
 	}
@@ -80,7 +81,7 @@ public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		Path in = new Path(inFile.getAbsoluteFile().toURI());
 
-		AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+		AvroInputFormat<User> users = new AvroInputFormat<>(in, User.class);
 		DataSet<User> usersDS = env.createInput(users)
 				.map((value) -> value);
 
@@ -88,8 +89,19 @@ public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
 
 		env.execute("Simple Avro read job");
 
-		expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": {\"KEY 2\": 17554, \"KEY 1\": 8546456}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" +
-					"{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": {}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n";
+		expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, " +
+			"\"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], " +
+			"\"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": {\"KEY 2\": 17554, \"KEY 1\": 8546456}, " +
+			"\"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}, " +
+			"\"type_bytes\": {\"bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\"}, \"type_date\": 2014-03-01, \"type_time_millis\": 12:12:12.000, " +
+			"\"type_time_micros\": 123456, \"type_timestamp_millis\": 2014-03-01T12:12:12.321Z, \"type_timestamp_micros\": 123456, " +
+			"\"type_decimal_bytes\": {\"bytes\": \"\\u0007Ð\"}, \"type_decimal_fixed\": [7, -48]}\n" +
+			"{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, " +
+			"\"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, " +
+			"\"type_enum\": \"RED\", \"type_map\": {}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", " +
+			"\"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}, \"type_bytes\": {\"bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\"}, " +
+			"\"type_date\": 2014-03-01, \"type_time_millis\": 12:12:12.000, \"type_time_micros\": 123456, \"type_timestamp_millis\": 2014-03-01T12:12:12.321Z, " +
+			"\"type_timestamp_micros\": 123456, \"type_decimal_bytes\": {\"bytes\": \"\\u0007Ð\"}, \"type_decimal_fixed\": [7, -48]}\n";
 	}
 
 	@Test
@@ -98,24 +110,31 @@ public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
 		env.getConfig().enableForceAvro();
 		Path in = new Path(inFile.getAbsoluteFile().toURI());
 
-		AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+		AvroInputFormat<User> users = new AvroInputFormat<>(in, User.class);
 		DataSet<User> usersDS = env.createInput(users)
-				.map(new MapFunction<User, User>() {
-					@Override
-					public User map(User value) throws Exception {
-						Map<CharSequence, Long> ab = new HashMap<CharSequence, Long>(1);
-						ab.put("hehe", 12L);
-						value.setTypeMap(ab);
-						return value;
-					}
+				.map((MapFunction<User, User>) value -> {
+					Map<CharSequence, Long> ab = new HashMap<>(1);
+					ab.put("hehe", 12L);
+					value.setTypeMap(ab);
+					return value;
 				});
 
 		usersDS.writeAsText(resultPath);
 
 		env.execute("Simple Avro read job");
 
-		expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" +
-					"{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n";
+		expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, " +
+			"\"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, " +
+			"\"type_enum\": \"GREEN\", \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", " +
+			"\"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}, \"type_bytes\": {\"bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\"}, " +
+			"\"type_date\": 2014-03-01, \"type_time_millis\": 12:12:12.000, \"type_time_micros\": 123456, \"type_timestamp_millis\": 2014-03-01T12:12:12.321Z, \"type_timestamp_micros\": 123456, " +
+			"\"type_decimal_bytes\": {\"bytes\": \"\\u0007Ð\"}, \"type_decimal_fixed\": [7, -48]}\n" +
+			"{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, " +
+			"\"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": {\"hehe\": 12}, " +
+			"\"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}, " +
+			"\"type_bytes\": {\"bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\"}, \"type_date\": 2014-03-01, \"type_time_millis\": 12:12:12.000, " +
+			"\"type_time_micros\": 123456, \"type_timestamp_millis\": 2014-03-01T12:12:12.321Z, \"type_timestamp_micros\": 123456, \"type_decimal_bytes\": {\"bytes\": \"\\u0007Ð\"}, " +
+			"\"type_decimal_fixed\": [7, -48]}\n";
 
 	}
 
@@ -125,17 +144,17 @@ public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
 		env.getConfig().enableObjectReuse();
 		Path in = new Path(inFile.getAbsoluteFile().toURI());
 
-		AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+		AvroInputFormat<User> users = new AvroInputFormat<>(in, User.class);
 		DataSet<User> usersDS = env.createInput(users);
 
-		DataSet<Tuple2<String, Integer>> res = usersDS.groupBy("name").reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() {
-			@Override
-			public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception {
+		DataSet<Tuple2<String, Integer>> res = usersDS
+			.groupBy("name")
+			.reduceGroup((GroupReduceFunction<User, Tuple2<String, Integer>>) (values, out) -> {
 				for (User u : values) {
-					out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1));
+					out.collect(new Tuple2<>(u.getName().toString(), 1));
 				}
-			}
-		});
+			})
+			.returns(Types.TUPLE(Types.STRING, Types.INT));
 		res.writeAsText(resultPath);
 		env.execute("Avro Key selection");
 
@@ -148,22 +167,17 @@ public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
 		env.getConfig().enableForceAvro();
 		Path in = new Path(inFile.getAbsoluteFile().toURI());
 
-		AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+		AvroInputFormat<User> users = new AvroInputFormat<>(in, User.class);
 		DataSet<User> usersDS = env.createInput(users);
 
-		DataSet<Tuple2<String, Integer>> res = usersDS.groupBy(new KeySelector<User, String>() {
-			@Override
-			public String getKey(User value) throws Exception {
-				return String.valueOf(value.getName());
-			}
-		}).reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() {
-			@Override
-			public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception {
+		DataSet<Tuple2<String, Integer>> res = usersDS
+			.groupBy((KeySelector<User, String>) value -> String.valueOf(value.getName()))
+			.reduceGroup((GroupReduceFunction<User, Tuple2<String, Integer>>) (values, out) -> {
 				for (User u : values) {
-					out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1));
+					out.collect(new Tuple2<>(u.getName().toString(), 1));
 				}
-			}
-		});
+			})
+			.returns(Types.TUPLE(Types.STRING, Types.INT));
 
 		res.writeAsText(resultPath);
 		env.execute("Avro Key selection");
@@ -177,22 +191,17 @@ public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
 		env.getConfig().enableForceKryo();
 		Path in = new Path(inFile.getAbsoluteFile().toURI());
 
-		AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+		AvroInputFormat<User> users = new AvroInputFormat<>(in, User.class);
 		DataSet<User> usersDS = env.createInput(users);
 
-		DataSet<Tuple2<String, Integer>> res = usersDS.groupBy(new KeySelector<User, String>() {
-			@Override
-			public String getKey(User value) throws Exception {
-				return String.valueOf(value.getName());
-			}
-		}).reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() {
-			@Override
-			public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception {
+		DataSet<Tuple2<String, Integer>> res = usersDS
+			.groupBy((KeySelector<User, String>) value -> String.valueOf(value.getName()))
+			.reduceGroup((GroupReduceFunction<User, Tuple2<String, Integer>>) (values, out) -> {
 				for (User u : values) {
-					out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1));
+					out.collect(new Tuple2<>(u.getName().toString(), 1));
 				}
-			}
-		});
+			})
+			.returns(Types.TUPLE(Types.STRING, Types.INT));
 
 		res.writeAsText(resultPath);
 		env.execute("Avro Key selection");
@@ -216,17 +225,17 @@ public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		Path in = new Path(inFile.getAbsoluteFile().toURI());
 
-		AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+		AvroInputFormat<User> users = new AvroInputFormat<>(in, User.class);
 		DataSet<User> usersDS = env.createInput(users);
 
-		DataSet<Object> res = usersDS.groupBy(fieldName).reduceGroup(new GroupReduceFunction<User, Object>() {
-			@Override
-			public void reduce(Iterable<User> values, Collector<Object> out) throws Exception {
+		DataSet<Object> res = usersDS
+			.groupBy(fieldName)
+			.reduceGroup((GroupReduceFunction<User, Object>) (values, out) -> {
 				for (User u : values) {
 					out.collect(u.get(fieldName));
 				}
-			}
-		});
+			})
+			.returns(Object.class);
 		res.writeAsText(resultPath);
 		env.execute("Simple Avro read job");
 
@@ -234,14 +243,19 @@ public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
 		ExecutionConfig ec = env.getConfig();
 		Assert.assertTrue(ec.getRegisteredKryoTypes().contains(Fixed16.class));
 
-		if (fieldName.equals("name")) {
-			expected = "Alyssa\nCharlie";
-		} else if (fieldName.equals("type_enum")) {
-			expected = "GREEN\nRED\n";
-		} else if (fieldName.equals("type_double_test")) {
-			expected = "123.45\n1.337\n";
-		} else {
-			Assert.fail("Unknown field");
+		switch (fieldName) {
+			case "name":
+				expected = "Alyssa\nCharlie";
+				break;
+			case "type_enum":
+				expected = "GREEN\nRED\n";
+				break;
+			case "type_double_test":
+				expected = "123.45\n1.337\n";
+				break;
+			default:
+				Assert.fail("Unknown field");
+				break;
 		}
 
 		after();

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
index f641636..cfd1506 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
 import org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.formats.avro.generated.SimpleUser;
 import org.apache.flink.formats.avro.utils.TestDataGenerator;
 
 import org.junit.Test;
@@ -51,12 +51,16 @@ import static org.junit.Assert.assertTrue;
  * works properly.
  *
  * <p>This test can be dropped once we drop backwards compatibility with Flink 1.3 snapshots.
+ *
+ * <p>The {@link BackwardsCompatibleAvroSerializer} does not support custom Kryo registrations (which
+ * logical types require for Avro 1.8 because Kryo does not support Joda-Time). We introduced a
+ * simpler user record for pre-Avro 1.8 test cases.
  */
 public class BackwardsCompatibleAvroSerializerTest {
 
-	private static final String SNAPSHOT_RESOURCE = "flink-1.3-avro-type-serializer-snapshot";
+	private static final String SNAPSHOT_RESOURCE = "flink-1.6-avro-type-serializer-snapshot";
 
-	private static final String DATA_RESOURCE = "flink-1.3-avro-type-serialized-data";
+	private static final String DATA_RESOURCE = "flink-1.6-avro-type-serialized-data";
 
 	@SuppressWarnings("unused")
 	private static final String SNAPSHOT_RESOURCE_WRITER = "/data/repositories/flink/flink-formats/flink-avro/src/test/resources/" + SNAPSHOT_RESOURCE;
@@ -73,7 +77,7 @@ public class BackwardsCompatibleAvroSerializerTest {
 
 		// retrieve the old config snapshot
 
-		final TypeSerializer<User> serializer;
+		final TypeSerializer<SimpleUser> serializer;
 		final TypeSerializerConfigSnapshot configSnapshot;
 
 		try (InputStream in = getClass().getClassLoader().getResourceAsStream(SNAPSHOT_RESOURCE)) {
@@ -86,7 +90,7 @@ public class BackwardsCompatibleAvroSerializerTest {
 			assertEquals(1, deserialized.size());
 
 			@SuppressWarnings("unchecked")
-			final TypeSerializer<User> typedSerializer = (TypeSerializer<User>) deserialized.get(0).f0;
+			final TypeSerializer<SimpleUser> typedSerializer = (TypeSerializer<SimpleUser>) deserialized.get(0).f0;
 
 			serializer = typedSerializer;
 			configSnapshot = deserialized.get(0).f1;
@@ -104,14 +108,14 @@ public class BackwardsCompatibleAvroSerializerTest {
 		// sanity check for the test: check that a PoJoSerializer and the original serializer work together
 		assertFalse(serializer.ensureCompatibility(configSnapshot).isRequiresMigration());
 
-		final TypeSerializer<User> newSerializer = new AvroTypeInfo<>(User.class, true).createSerializer(new ExecutionConfig());
+		final TypeSerializer<SimpleUser> newSerializer = new AvroTypeInfo<>(SimpleUser.class, true).createSerializer(new ExecutionConfig());
 		assertFalse(newSerializer.ensureCompatibility(configSnapshot).isRequiresMigration());
 
 		// deserialize the data and make sure this still works
 		validateDeserialization(newSerializer);
 
 		TypeSerializerConfigSnapshot nextSnapshot = newSerializer.snapshotConfiguration();
-		final TypeSerializer<User> nextSerializer = new AvroTypeInfo<>(User.class, true).createSerializer(new ExecutionConfig());
+		final TypeSerializer<SimpleUser> nextSerializer = new AvroTypeInfo<>(SimpleUser.class, true).createSerializer(new ExecutionConfig());
 
 		assertFalse(nextSerializer.ensureCompatibility(nextSnapshot).isRequiresMigration());
 
@@ -119,7 +123,7 @@ public class BackwardsCompatibleAvroSerializerTest {
 		validateDeserialization(nextSerializer);
 	}
 
-	private static void validateDeserialization(TypeSerializer<User> serializer) throws IOException {
+	private static void validateDeserialization(TypeSerializer<SimpleUser> serializer) throws IOException {
 		final Random rnd = new Random(RANDOM_SEED);
 
 		try (InputStream in = BackwardsCompatibleAvroSerializerTest.class.getClassLoader()
@@ -128,10 +132,10 @@ public class BackwardsCompatibleAvroSerializerTest {
 			final DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(in);
 
 			for (int i = 0; i < NUM_DATA_ENTRIES; i++) {
-				final User deserialized = serializer.deserialize(inView);
+				final SimpleUser deserialized = serializer.deserialize(inView);
 
 				// deterministically generate a reference record
-				final User reference = TestDataGenerator.generateRandomUser(rnd);
+				final SimpleUser reference = TestDataGenerator.generateRandomSimpleUser(rnd);
 
 				assertEquals(reference, deserialized);
 			}
@@ -141,9 +145,9 @@ public class BackwardsCompatibleAvroSerializerTest {
 // run this code to generate the test data
 //	public static void main(String[] args) throws Exception {
 //
-//		AvroTypeInfo<User> typeInfo = new AvroTypeInfo<>(User.class);
+//		AvroTypeInfo<SimpleUser> typeInfo = new AvroTypeInfo<>(SimpleUser.class);
 //
-//		TypeSerializer<User> serializer = typeInfo.createPojoSerializer(new ExecutionConfig());
+//		TypeSerializer<SimpleUser> serializer = typeInfo.createPojoSerializer(new ExecutionConfig());
 //		TypeSerializerConfigSnapshot confSnapshot = serializer.snapshotConfiguration();
 //
 //		try (FileOutputStream fos = new FileOutputStream(SNAPSHOT_RESOURCE_WRITER)) {
@@ -160,7 +164,7 @@ public class BackwardsCompatibleAvroSerializerTest {
 //			final Random rnd = new Random(RANDOM_SEED);
 //
 //			for (int i = 0; i < NUM_DATA_ENTRIES; i++) {
-//				serializer.serialize(TestDataGenerator.generateRandomUser(rnd), out);
+//				serializer.serialize(TestDataGenerator.generateRandomSimpleUser(rnd), out);
 //			}
 //		}
 //	}

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
index ce23ccc..9d77f32 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
@@ -18,87 +18,44 @@
 
 package org.apache.flink.formats.avro.utils;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.formats.avro.generated.Address;
 import org.apache.flink.formats.avro.generated.Colors;
+import org.apache.flink.formats.avro.generated.Fixed16;
+import org.apache.flink.formats.avro.generated.Fixed2;
 import org.apache.flink.formats.avro.generated.User;
 import org.apache.flink.types.Row;
 
 import org.apache.avro.Schema;
-import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
+import org.joda.time.DateTime;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.util.ArrayList;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
 import java.util.Arrays;
-import java.util.HashMap;
+import java.util.Collections;
 
 /**
  * Utilities for creating Avro Schemas.
  */
 public final class AvroTestUtils {
 
-	private static final String NAMESPACE = "org.apache.flink.streaming.connectors.kafka";
-
-	/**
-	 * Creates a flat Avro Schema for testing.
-	 */
-	public static Schema createFlatAvroSchema(String[] fieldNames, TypeInformation[] fieldTypes) {
-		final SchemaBuilder.FieldAssembler<Schema> fieldAssembler = SchemaBuilder
-			.record("BasicAvroRecord")
-			.namespace(NAMESPACE)
-			.fields();
-
-		final Schema nullSchema = Schema.create(Schema.Type.NULL);
-
-		for (int i = 0; i < fieldNames.length; i++) {
-			Schema schema = ReflectData.get().getSchema(fieldTypes[i].getTypeClass());
-			Schema unionSchema = Schema.createUnion(Arrays.asList(nullSchema, schema));
-			fieldAssembler.name(fieldNames[i]).type(unionSchema).noDefault();
-		}
-
-		return fieldAssembler.endRecord();
-	}
-
 	/**
-	 * Tests a simple Avro data types without nesting.
+	 * Tests all Avro data types as well as nested types for a specific record.
 	 */
-	public static Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> getSimpleTestData() {
-		final Address addr = Address.newBuilder()
-			.setNum(42)
-			.setStreet("Main Street 42")
-			.setCity("Test City")
-			.setState("Test State")
-			.setZip("12345")
-			.build();
-
-		final Row rowAddr = new Row(5);
-		rowAddr.setField(0, 42);
-		rowAddr.setField(1, "Main Street 42");
-		rowAddr.setField(2, "Test City");
-		rowAddr.setField(3, "Test State");
-		rowAddr.setField(4, "12345");
-
-		final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> t = new Tuple3<>();
-		t.f0 = Address.class;
-		t.f1 = addr;
-		t.f2 = rowAddr;
-
-		return t;
-	}
-
-	/**
-	 * Tests all Avro data types as well as nested types.
-	 */
-	public static Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> getComplexTestData() {
+	public static Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> getSpecificTestData() {
 		final Address addr = Address.newBuilder()
 			.setNum(42)
 			.setStreet("Main Street 42")
@@ -122,17 +79,30 @@ public final class AvroTestUtils {
 			.setTypeDoubleTest(1.337d)
 			.setTypeNullTest(null)
 			.setTypeBoolTest(false)
-			.setTypeArrayString(new ArrayList<CharSequence>())
-			.setTypeArrayBoolean(new ArrayList<Boolean>())
+			.setTypeArrayString(Arrays.asList("hello", "world"))
+			.setTypeArrayBoolean(Arrays.asList(true, true, false))
 			.setTypeNullableArray(null)
 			.setTypeEnum(Colors.RED)
-			.setTypeMap(new HashMap<CharSequence, Long>())
-			.setTypeFixed(null)
-			.setTypeUnion(null)
+			.setTypeMap(Collections.singletonMap("test", 12L))
+			.setTypeFixed(new Fixed16(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}))
+			.setTypeUnion(12.0)
 			.setTypeNested(addr)
+			.setTypeBytes(ByteBuffer.allocate(10))
+			.setTypeDate(LocalDate.parse("2014-03-01"))
+			.setTypeTimeMillis(LocalTime.parse("12:12:12"))
+			.setTypeTimeMicros(123456)
+			.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"))
+			.setTypeTimestampMicros(123456L)
+			// byte array must contain the two's-complement representation of the
+			// unscaled integer value in big-endian byte order
+			.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
+			// array of length n can store at most
+			// Math.floor(Math.log10(Math.pow(2, 8 * n - 1) - 1))
+			// base-10 digits of precision
+			.setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
 			.build();
 
-		final Row rowUser = new Row(15);
+		final Row rowUser = new Row(23);
 		rowUser.setField(0, "Charlie");
 		rowUser.setField(1, null);
 		rowUser.setField(2, "blue");
@@ -140,16 +110,24 @@ public final class AvroTestUtils {
 		rowUser.setField(4, 1.337d);
 		rowUser.setField(5, null);
 		rowUser.setField(6, false);
-		rowUser.setField(7, new ArrayList<CharSequence>());
-		rowUser.setField(8, new ArrayList<Boolean>());
+		rowUser.setField(7, new String[]{"hello", "world"});
+		rowUser.setField(8, new Boolean[]{true, true, false});
 		rowUser.setField(9, null);
-		rowUser.setField(10, Colors.RED);
-		rowUser.setField(11, new HashMap<CharSequence, Long>());
-		rowUser.setField(12, null);
-		rowUser.setField(13, null);
+		rowUser.setField(10, "RED");
+		rowUser.setField(11, Collections.singletonMap("test", 12L));
+		rowUser.setField(12, new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16});
+		rowUser.setField(13, 12.0);
 		rowUser.setField(14, rowAddr);
-
-		final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> t = new Tuple3<>();
+		rowUser.setField(15, new byte[10]);
+		rowUser.setField(16, Date.valueOf("2014-03-01"));
+		rowUser.setField(17, Time.valueOf("12:12:12"));
+		rowUser.setField(18, 123456);
+		rowUser.setField(19, Timestamp.valueOf("2014-03-01 12:12:12.321"));
+		rowUser.setField(20, 123456L);
+		rowUser.setField(21, BigDecimal.valueOf(2000, 2));
+		rowUser.setField(22, BigDecimal.valueOf(2000, 2));
+
+		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> t = new Tuple3<>();
 		t.f0 = User.class;
 		t.f1 = user;
 		t.f2 = rowUser;
@@ -158,6 +136,109 @@ public final class AvroTestUtils {
 	}
 
 	/**
+	 * Tests almost all Avro data types as well as nested types for a generic record.
+	 */
+	public static Tuple3<GenericRecord, Row, Schema> getGenericTestData() {
+		final String schemaString =
+			"{\"type\":\"record\",\"name\":\"GenericUser\",\"namespace\":\"org.apache.flink.formats.avro.generated\"," +
+			"\"fields\": [{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]}," +
+			"{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]},{\"name\":\"type_long_test\",\"type\":[\"long\",\"null\"]}" +
+			",{\"name\":\"type_double_test\",\"type\":\"double\"},{\"name\":\"type_null_test\",\"type\":[\"null\"]}," +
+			"{\"name\":\"type_bool_test\",\"type\":[\"boolean\"]},{\"name\":\"type_array_string\",\"type\":" +
+			"{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"type_array_boolean\",\"type\":{\"type\":\"array\"," +
+			"\"items\":\"boolean\"}},{\"name\":\"type_nullable_array\",\"type\":[\"null\",{\"type\":\"array\"," +
+			"\"items\":\"string\"}],\"default\":null},{\"name\":\"type_enum\",\"type\":{\"type\":\"enum\"," +
+			"\"name\":\"Colors\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}},{\"name\":\"type_map\",\"type\":{\"type\":\"map\"," +
+			"\"values\":\"long\"}},{\"name\":\"type_fixed\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"Fixed16\"," +
+			"\"size\":16}],\"size\":16},{\"name\":\"type_union\",\"type\":[\"null\",\"boolean\",\"long\",\"double\"]}," +
+			"{\"name\":\"type_nested\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Address\",\"fields\":[{\"name\":\"num\"," +
+			"\"type\":\"int\"},{\"name\":\"street\",\"type\":\"string\"},{\"name\":\"city\",\"type\":\"string\"}," +
+			"{\"name\":\"state\",\"type\":\"string\"},{\"name\":\"zip\",\"type\":\"string\"}]}]},{\"name\":\"type_bytes\"," +
+			"\"type\":\"bytes\"},{\"name\":\"type_date\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}}," +
+			"{\"name\":\"type_time_millis\",\"type\":{\"type\":\"int\",\"logicalType\":\"time-millis\"}},{\"name\":\"type_time_micros\"," +
+			"\"type\":{\"type\":\"int\",\"logicalType\":\"time-micros\"}},{\"name\":\"type_timestamp_millis\",\"type\":{\"type\":\"long\"," +
+			"\"logicalType\":\"timestamp-millis\"}},{\"name\":\"type_timestamp_micros\",\"type\":{\"type\":\"long\"," +
+			"\"logicalType\":\"timestamp-micros\"}},{\"name\":\"type_decimal_bytes\",\"type\":{\"type\":\"bytes\"," +
+			"\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}},{\"name\":\"type_decimal_fixed\",\"type\":{\"type\":\"fixed\"," +
+			"\"name\":\"Fixed2\",\"size\":2,\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}}]}";
+		final Schema schema = new Schema.Parser().parse(schemaString);
+		GenericRecord addr = new GenericData.Record(schema.getField("type_nested").schema().getTypes().get(1));
+		addr.put("num", 42);
+		addr.put("street", "Main Street 42");
+		addr.put("city", "Test City");
+		addr.put("state", "Test State");
+		addr.put("zip", "12345");
+
+		final Row rowAddr = new Row(5);
+		rowAddr.setField(0, 42);
+		rowAddr.setField(1, "Main Street 42");
+		rowAddr.setField(2, "Test City");
+		rowAddr.setField(3, "Test State");
+		rowAddr.setField(4, "12345");
+
+		final GenericRecord user = new GenericData.Record(schema);
+		user.put("name", "Charlie");
+		user.put("favorite_number", null);
+		user.put("favorite_color", "blue");
+		user.put("type_long_test", 1337L);
+		user.put("type_double_test", 1.337d);
+		user.put("type_null_test", null);
+		user.put("type_bool_test", false);
+		user.put("type_array_string", Arrays.asList("hello", "world"));
+		user.put("type_array_boolean", Arrays.asList(true, true, false));
+		user.put("type_nullable_array", null);
+		user.put("type_enum", new GenericData.EnumSymbol(schema.getField("type_enum").schema(), "RED"));
+		user.put("type_map", Collections.singletonMap("test", 12L));
+		user.put("type_fixed", new Fixed16(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}));
+		user.put("type_union", 12.0);
+		user.put("type_nested", addr);
+		user.put("type_bytes", ByteBuffer.allocate(10));
+		user.put("type_date", LocalDate.parse("2014-03-01"));
+		user.put("type_time_millis", LocalTime.parse("12:12:12"));
+		user.put("type_time_micros", 123456);
+		user.put("type_timestamp_millis", DateTime.parse("2014-03-01T12:12:12.321Z"));
+		user.put("type_timestamp_micros", 123456L);
+		user.put("type_decimal_bytes",
+			ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
+		user.put("type_decimal_fixed",
+			new GenericData.Fixed(
+				schema.getField("type_decimal_fixed").schema(),
+				BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
+
+		final Row rowUser = new Row(23);
+		rowUser.setField(0, "Charlie");
+		rowUser.setField(1, null);
+		rowUser.setField(2, "blue");
+		rowUser.setField(3, 1337L);
+		rowUser.setField(4, 1.337d);
+		rowUser.setField(5, null);
+		rowUser.setField(6, false);
+		rowUser.setField(7, new String[]{"hello", "world"});
+		rowUser.setField(8, new Boolean[]{true, true, false});
+		rowUser.setField(9, null);
+		rowUser.setField(10, "RED");
+		rowUser.setField(11, Collections.singletonMap("test", 12L));
+		rowUser.setField(12, new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16});
+		rowUser.setField(13, 12.0);
+		rowUser.setField(14, rowAddr);
+		rowUser.setField(15, new byte[10]);
+		rowUser.setField(16, Date.valueOf("2014-03-01"));
+		rowUser.setField(17, Time.valueOf("12:12:12"));
+		rowUser.setField(18, 123456);
+		rowUser.setField(19, Timestamp.valueOf("2014-03-01 12:12:12.321"));
+		rowUser.setField(20, 123456L);
+		rowUser.setField(21, BigDecimal.valueOf(2000, 2));
+		rowUser.setField(22, BigDecimal.valueOf(2000, 2));
+
+		final Tuple3<GenericRecord, Row, Schema> t = new Tuple3<>();
+		t.f0 = user;
+		t.f1 = rowUser;
+		t.f2 = schema;
+
+		return t;
+	}
+
+	/**
 	 * Writes given record using specified schema.
 	 * @param record record to serialize
 	 * @param schema schema to use for serialization

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
index 9205627..a4c5bf8 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
@@ -21,8 +21,16 @@ package org.apache.flink.formats.avro.utils;
 import org.apache.flink.formats.avro.generated.Address;
 import org.apache.flink.formats.avro.generated.Colors;
 import org.apache.flink.formats.avro.generated.Fixed16;
+import org.apache.flink.formats.avro.generated.Fixed2;
+import org.apache.flink.formats.avro.generated.SimpleUser;
 import org.apache.flink.formats.avro.generated.User;
 
+import org.joda.time.DateTime;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -49,7 +57,35 @@ public class TestDataGenerator {
 				new HashMap<>(),
 				generateRandomFixed16(rnd),
 				generateRandomUnion(rnd),
-				generateRandomAddress(rnd));
+				generateRandomAddress(rnd),
+				generateRandomBytes(rnd),
+				LocalDate.parse("2014-03-01"),
+				LocalTime.parse("12:12:12"),
+				123456,
+				DateTime.parse("2014-03-01T12:12:12.321Z"),
+				123456L,
+				ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()),
+				new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
+	}
+
+	public static SimpleUser generateRandomSimpleUser(Random rnd) {
+		return new SimpleUser(
+				generateRandomString(rnd, 50),
+				rnd.nextBoolean() ? null : rnd.nextInt(),
+				rnd.nextBoolean() ? null : generateRandomString(rnd, 6),
+				rnd.nextBoolean() ? null : rnd.nextLong(),
+				rnd.nextDouble(),
+				null,
+				rnd.nextBoolean(),
+				generateRandomStringList(rnd, 20, 30),
+				generateRandomBooleanList(rnd, 20),
+				rnd.nextBoolean() ? null : generateRandomStringList(rnd, 20, 20),
+				generateRandomColor(rnd),
+				new HashMap<>(),
+				generateRandomFixed16(rnd),
+				generateRandomUnion(rnd),
+				generateRandomAddress(rnd),
+				generateRandomBytes(rnd));
 	}
 
 	public static Colors generateRandomColor(Random rnd) {
@@ -76,6 +112,12 @@ public class TestDataGenerator {
 				generateRandomString(rnd, 20));
 	}
 
+	public static ByteBuffer generateRandomBytes(Random rnd) {
+		final byte[] bytes = new byte[10];
+		rnd.nextBytes(bytes);
+		return ByteBuffer.wrap(bytes);
+	}
+
 	private static List<Boolean> generateRandomBooleanList(Random rnd, int maxEntries) {
 		final int num = rnd.nextInt(maxEntries + 1);
 		ArrayList<Boolean> list = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/table/descriptors/AvroTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/descriptors/AvroTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/descriptors/AvroTest.java
index 2345553..342b32c 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/descriptors/AvroTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/descriptors/AvroTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.table.api.ValidationException;
 
 import org.junit.Test;
 
-import java.util.Collections;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -38,12 +38,20 @@ public class AvroTest extends DescriptorTestBase {
 		removePropertyAndVerify(descriptors().get(0), "format.record-class");
 	}
 
+	@Test(expected = ValidationException.class)
+	public void testRecordClassAndAvroSchema() {
+		addPropertyAndVerify(descriptors().get(0), "format.avro-schema", "{...}");
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	@Override
 	public List<Descriptor> descriptors() {
 		final Descriptor desc1 = new Avro().recordClass(User.class);
-		return Collections.singletonList(desc1);
+
+		final Descriptor desc2 = new Avro().avroSchema("{...}");
+
+		return Arrays.asList(desc1, desc2);
 	}
 
 	@Override
@@ -53,7 +61,12 @@ public class AvroTest extends DescriptorTestBase {
 		props1.put("format.property-version", "1");
 		props1.put("format.record-class", "org.apache.flink.formats.avro.generated.User");
 
-		return Collections.singletonList(props1);
+		final Map<String, String> props2 = new HashMap<>();
+		props2.put("format.type", "avro");
+		props2.put("format.property-version", "1");
+		props2.put("format.avro-schema", "{...}");
+
+		return Arrays.asList(props1, props2);
 	}
 
 	@Override