You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/07/03 15:17:34 UTC

[3/3] flink git commit: [FLINK-9444] [formats] Add full SQL support for Avro formats

[FLINK-9444] [formats] Add full SQL support for Avro formats

This PR adds full support of Apache Avro records for the Table API & SQL. It adds (de)serialization schemas to the row type for both specific and generic records. It converts all Avro types to Flink types and vice versa. It supports both physical and logical Avro types. Both an Avro class or a Avro schema string can be used for format initialization.

This closes #6218.
This closes #6082.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c34c7e41
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c34c7e41
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c34c7e41

Branch: refs/heads/master
Commit: c34c7e4127c8947d68e2b960cd84206e59d479b3
Parents: 19040a6
Author: Timo Walther <tw...@apache.org>
Authored: Tue Jun 26 11:46:06 2018 +0200
Committer: Timo Walther <tw...@apache.org>
Committed: Tue Jul 3 15:40:44 2018 +0200

----------------------------------------------------------------------
 docs/dev/table/sqlClient.md                     |  73 +++-
 .../connectors/kafka/KafkaAvroTableSource.java  |   4 +-
 .../kafka/KafkaAvroTableSourceFactory.java      |   4 +-
 .../kafka/KafkaAvroTableSourceTestBase.java     |  83 +----
 .../flink/api/java/typeutils/RowTypeInfo.java   |  24 +-
 .../api/java/typeutils/RowTypeInfoTest.java     |  29 +-
 .../kryo/KryoWithCustomSerializersTest.java     |   8 +-
 flink-formats/flink-avro/pom.xml                |  37 +++
 .../avro/AvroRowDeserializationSchema.java      | 329 ++++++++++++++-----
 .../avro/AvroRowSerializationSchema.java        | 269 ++++++++++++---
 .../typeutils/AvroRecordClassConverter.java     |  81 -----
 .../avro/typeutils/AvroSchemaConverter.java     | 160 +++++++++
 .../avro/utils/AvroKryoSerializerUtils.java     |  70 ++++
 .../apache/flink/table/descriptors/Avro.java    |  23 +-
 .../flink/table/descriptors/AvroValidator.java  |  15 +-
 .../formats/avro/AvroOutputFormatITCase.java    |  41 ++-
 .../formats/avro/AvroOutputFormatTest.java      |  23 +-
 .../formats/avro/AvroRecordInputFormatTest.java |  73 ++--
 .../avro/AvroRowDeSerializationSchemaTest.java  | 127 +++++--
 .../avro/AvroSplittableInputFormatTest.java     | 115 +++++--
 .../flink/formats/avro/EncoderDecoderTest.java  | 157 +++++----
 .../avro/typeutils/AvroSchemaConverterTest.java | 116 +++++++
 .../avro/typeutils/AvroTypeExtractionTest.java  | 134 ++++----
 .../BackwardsCompatibleAvroSerializerTest.java  |  30 +-
 .../flink/formats/avro/utils/AvroTestUtils.java | 223 +++++++++----
 .../formats/avro/utils/TestDataGenerator.java   |  44 ++-
 .../flink/table/descriptors/AvroTest.java       |  19 +-
 .../table/runtime/batch/AvroTypesITCase.java    |  85 +++--
 .../src/test/resources/avro/user.avsc           |  63 +++-
 .../flink-1.3-avro-type-serialized-data         | Bin 23829 -> 0 bytes
 .../flink-1.3-avro-type-serializer-snapshot     | Bin 33772 -> 0 bytes
 .../flink-1.6-avro-type-serialized-data         | Bin 0 -> 23563 bytes
 .../flink-1.6-avro-type-serializer-snapshot     | Bin 0 -> 36411 bytes
 .../json/JsonRowDeserializationSchema.java      |   1 -
 .../flink/table/typeutils/TypeStringUtils.scala |   7 +-
 .../table/typeutils/TypeStringUtilsTest.scala   |  26 +-
 36 files changed, 1819 insertions(+), 674 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/docs/dev/table/sqlClient.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md
index 2bdec2b..f850082 100644
--- a/docs/dev/table/sqlClient.md
+++ b/docs/dev/table/sqlClient.md
@@ -237,6 +237,7 @@ The SQL Client does not require to setup a Java project using Maven or SBT. Inst
 | :---------------- | :--------------------- |
 | CSV               | Built-in               |
 | JSON              | [Download](http://central.maven.org/maven2/org/apache/flink/flink-json/{{site.version}}/flink-json-{{site.version}}-sql-jar.jar) |
+| Apache Avro       | [Download](http://central.maven.org/maven2/org/apache/flink/flink-avro/{{site.version}}/flink-avro-{{site.version}}-sql-jar.jar) |
 
 {% endif %}
 
@@ -476,7 +477,7 @@ The CSV format is included in Flink and does not require an additional JAR file.
 
 #### JSON Format
 
-The JSON format allows to read JSON data that corresponds to a given format schema. The format schema can be defined either as a Flink [type string](sqlClient.html#type-strings), as a JSON schema, or derived from the desired table schema. A type string enables a more SQL-like definition and mapping to the corresponding SQL data types. The JSON schema allows for more complex and nested structures.
+The JSON format allows to read and write JSON data that corresponds to a given format schema. The format schema can be defined either as a Flink [type string](sqlClient.html#type-strings), as a JSON schema, or derived from the desired table schema. A type string enables a more SQL-like definition and mapping to the corresponding SQL data types. The JSON schema allows for more complex and nested structures.
 
 If the format schema is equal to the table schema, the schema can also be automatically derived. This allows for defining schema information only once. The names, types, and field order of the format are determined by the table's schema. Time attributes are ignored. A `from` definition in the table schema is interpreted as a field renaming in the format.
 
@@ -507,6 +508,23 @@ format:
   derive-schema: true
 {% endhighlight %}
 
+The following table shows the mapping of JSON schema types to Flink SQL types:
+
+| JSON schema                       | Flink SQL               |
+| :-------------------------------- | :---------------------- |
+| `object`                          | `ROW`                   |
+| `boolean`                         | `BOOLEAN`               |
+| `array`                           | `ARRAY[_]`              |
+| `number`                          | `DECIMAL`               |
+| `integer`                         | `DECIMAL`               |
+| `string`                          | `VARCHAR`               |
+| `string` with `format: date-time` | `TIMESTAMP`             |
+| `string` with `format: date`      | `DATE`                  |
+| `string` with `format: time`      | `TIME`                  |
+| `string` with `encoding: base64`  | `ARRAY[TINYINT]`        |
+| `null`                            | `NULL` (unsupported yet)|
+
+
 Currently, Flink supports only a subset of the [JSON schema specification](http://json-schema.org/) `draft-07`. Union types (as well as `allOf`, `anyOf`, `not`) are not supported yet. `oneOf` and arrays of types are only supported for specifying nullability.
 
 Simple references that link to a common definition in the document are supported as shown in the more complex example below:
@@ -558,6 +576,59 @@ Simple references that link to a common definition in the document are supported
 
 Make sure to download the [JSON SQL JAR](sqlClient.html#dependencies) file and pass it to the SQL Client.
 
+#### Apache Avro Format
+
+The [Apache Avro](https://avro.apache.org/) format allows to read and write Avro data that corresponds to a given format schema. The format schema can be defined either as a fully qualified class name of an Avro specific record or as an Avro schema string. If a class name is used, the class must be available in the classpath during runtime.
+
+{% highlight yaml %}
+format:
+  type: avro
+
+  # required: define the schema either by using an Avro specific record class
+  record-class: "org.organization.types.User"
+
+  # or by using an Avro schema
+  avro-schema: >
+    {
+      "type": "record",
+      "name": "test",
+      "fields" : [
+        {"name": "a", "type": "long"},
+        {"name": "b", "type": "string"}
+      ]
+    }
+{% endhighlight %}
+
+Avro types are mapped to the corresponding SQL data types. Union types are only supported for specifying nullability otherwise they are converted to an `ANY` type. The following table shows the mapping:
+
+| Avro schema                                 | Flink SQL               |
+| :------------------------------------------ | :---------------------- |
+| `record`                                    | `ROW`                   |
+| `enum`                                      | `VARCHAR`               |
+| `array`                                     | `ARRAY[_]`              |
+| `map`                                       | `MAP[VARCHAR, _]`       |
+| `union`                                     | non-null type or `ANY`  |
+| `fixed`                                     | `ARRAY[TINYINT]`        |
+| `string`                                    | `VARCHAR`               |
+| `bytes`                                     | `ARRAY[TINYINT]`        |
+| `int`                                       | `INT`                   |
+| `long`                                      | `BIGINT`                |
+| `float`                                     | `FLOAT`                 |
+| `double`                                    | `DOUBLE`                |
+| `boolean`                                   | `BOOLEAN`               |
+| `int` with `logicalType: date`              | `DATE`                  |
+| `int` with `logicalType: time-millis`       | `TIME`                  |
+| `int` with `logicalType: time-micros`       | `INT`                   |
+| `long` with `logicalType: timestamp-millis` | `TIMESTAMP`             |
+| `long` with `logicalType: timestamp-micros` | `BIGINT`                |
+| `bytes` with `logicalType: decimal`         | `DECIMAL`               |
+| `fixed` with `logicalType: decimal`         | `DECIMAL`               |
+| `null`                                      | `NULL` (unsupported yet)|
+
+Avro uses [Joda-Time](http://www.joda.org/joda-time/) for representing logical date and time types in specific record classes. The Joda-Time dependency is not part of Flink's SQL JAR distribution. Therefore, make sure that Joda-Time is in your classpath together with your specific record class during runtime. Avro formats specified via a schema string do not require Joda-Time to be present.
+
+Make sure to download the [Apache Avro SQL JAR](sqlClient.html#dependencies) file and pass it to the SQL Client.
+
 {% top %}
 
 Limitations & Future

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
index 7828a1c..8c8ce32 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.formats.avro.AvroRowDeserializationSchema;
-import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.sources.DefinedFieldMapping;
 import org.apache.flink.table.sources.StreamTableSource;
@@ -64,7 +64,7 @@ public abstract class KafkaAvroTableSource extends KafkaTableSource implements D
 			topic,
 			properties,
 			schema,
-			AvroRecordClassConverter.convert(avroRecordClass));
+			AvroSchemaConverter.convertToTypeInfo(avroRecordClass));
 
 		this.avroRecordClass = avroRecordClass;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceFactory.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceFactory.java
index 1401914..8ef7270 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceFactory.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceFactory.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.descriptors.AvroValidator;
 import org.apache.flink.table.descriptors.DescriptorProperties;
@@ -65,7 +65,7 @@ public abstract class KafkaAvroTableSourceFactory extends KafkaTableSourceFactor
 		final Class<? extends SpecificRecordBase> avroRecordClass =
 				params.getClass(AvroValidator.FORMAT_RECORD_CLASS, SpecificRecordBase.class);
 		builder.forAvroRecordClass(avroRecordClass);
-		final TableSchema formatSchema = TableSchema.fromTypeInfo(AvroRecordClassConverter.convert(avroRecordClass));
+		final TableSchema formatSchema = TableSchema.fromTypeInfo(AvroSchemaConverter.convertToTypeInfo(avroRecordClass));
 
 		// field mapping
 		final Map<String, String> mapping = SchemaValidator.deriveFieldMapping(params, Optional.of(formatSchema));

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
index 16beb7d..f86fc95 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
@@ -18,17 +18,14 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.formats.avro.utils.AvroTestUtils;
+import org.apache.flink.formats.avro.generated.DifferentSchemaRecord;
+import org.apache.flink.formats.avro.generated.SchemaRecord;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.Types;
 
-import org.apache.avro.Schema;
-import org.apache.avro.specific.SpecificRecordBase;
 import org.junit.Test;
 
-import java.sql.Timestamp;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -44,7 +41,7 @@ public abstract class KafkaAvroTableSourceTestBase extends KafkaTableSourceTestB
 	@Override
 	protected void configureBuilder(KafkaTableSource.Builder builder) {
 		super.configureBuilder(builder);
-		((KafkaAvroTableSource.Builder) builder).forAvroRecordClass(SameFieldsAvroClass.class);
+		((KafkaAvroTableSource.Builder) builder).forAvroRecordClass(SchemaRecord.class);
 	}
 
 	@Test
@@ -67,8 +64,8 @@ public abstract class KafkaAvroTableSourceTestBase extends KafkaTableSourceTestB
 		// check field types
 		assertEquals(Types.LONG(), returnType.getTypeAt(0));
 		assertEquals(Types.STRING(), returnType.getTypeAt(1));
-		assertEquals(Types.SQL_TIMESTAMP(), returnType.getTypeAt(2));
-		assertEquals(Types.SQL_TIMESTAMP(), returnType.getTypeAt(3));
+		assertEquals(Types.LONG(), returnType.getTypeAt(2));
+		assertEquals(Types.LONG(), returnType.getTypeAt(3));
 		assertEquals(Types.DOUBLE(), returnType.getTypeAt(4));
 
 		// check field mapping
@@ -91,7 +88,7 @@ public abstract class KafkaAvroTableSourceTestBase extends KafkaTableSourceTestB
 		mapping.put("field3", "otherField3");
 
 		// set Avro class with different fields
-		b.forAvroRecordClass(DifferentFieldsAvroClass.class);
+		b.forAvroRecordClass(DifferentSchemaRecord.class);
 		b.withTableToAvroMapping(mapping);
 
 		KafkaAvroTableSource source = (KafkaAvroTableSource) b.build();
@@ -110,9 +107,9 @@ public abstract class KafkaAvroTableSourceTestBase extends KafkaTableSourceTestB
 		// check field types
 		assertEquals(Types.LONG(), returnType.getTypeAt(0));
 		assertEquals(Types.STRING(), returnType.getTypeAt(1));
-		assertEquals(Types.SQL_TIMESTAMP(), returnType.getTypeAt(2));
+		assertEquals(Types.LONG(), returnType.getTypeAt(2));
 		assertEquals(Types.DOUBLE(), returnType.getTypeAt(3));
-		assertEquals(Types.BYTE(), returnType.getTypeAt(4));
+		assertEquals(Types.FLOAT(), returnType.getTypeAt(4));
 		assertEquals(Types.INT(), returnType.getTypeAt(5));
 
 		// check field mapping
@@ -127,68 +124,4 @@ public abstract class KafkaAvroTableSourceTestBase extends KafkaTableSourceTestB
 		assertEquals(source.getReturnType(),
 			source.getDataStream(StreamExecutionEnvironment.getExecutionEnvironment()).getType());
 	}
-
-	/**
-	 * Avro record that matches the table schema.
-	 */
-	@SuppressWarnings("unused")
-	public static class SameFieldsAvroClass extends SpecificRecordBase {
-
-		//CHECKSTYLE.OFF: StaticVariableNameCheck - Avro accesses this field by name via reflection.
-		public static Schema SCHEMA$ = AvroTestUtils.createFlatAvroSchema(FIELD_NAMES, FIELD_TYPES);
-		//CHECKSTYLE.ON: StaticVariableNameCheck
-
-		public Long field1;
-		public String field2;
-		public Timestamp time1;
-		public Timestamp time2;
-		public Double field3;
-
-		@Override
-		public Schema getSchema() {
-			return null;
-		}
-
-		@Override
-		public Object get(int field) {
-			return null;
-		}
-
-		@Override
-		public void put(int field, Object value) { }
-	}
-
-	/**
-	 * Avro record that does NOT match the table schema.
-	 */
-	@SuppressWarnings("unused")
-	public static class DifferentFieldsAvroClass extends SpecificRecordBase {
-
-		//CHECKSTYLE.OFF: StaticVariableNameCheck - Avro accesses this field by name via reflection.
-		public static Schema SCHEMA$ = AvroTestUtils.createFlatAvroSchema(
-			new String[]{"otherField1", "otherField2", "otherTime1", "otherField3", "otherField4", "otherField5"},
-			new TypeInformation[]{Types.LONG(), Types.STRING(), Types.SQL_TIMESTAMP(), Types.DOUBLE(), Types.BYTE(), Types.INT()});
-		//CHECKSTYLE.ON: StaticVariableNameCheck
-
-		public Long otherField1;
-		public String otherField2;
-		public Timestamp otherTime1;
-		public Double otherField3;
-		public Byte otherField4;
-		public Integer otherField5;
-
-		@Override
-		public Schema getSchema() {
-			return null;
-		}
-
-		@Override
-		public Object get(int field) {
-			return null;
-		}
-
-		@Override
-		public void put(int field, Object value) { }
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
index 24ccfb1..75c28ef 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
@@ -249,7 +249,22 @@ public class RowTypeInfo extends TupleTypeInfoBase<Row> {
 
 	@Override
 	public int hashCode() {
-		return 31 * super.hashCode() + Arrays.hashCode(fieldNames);
+		return 31 * super.hashCode();
+	}
+
+	/**
+	 * The equals method does only check for field types. Field names do not matter during
+	 * runtime so we can consider rows with the same field types as equal.
+	 * Use {@link RowTypeInfo#schemaEquals(Object)} for checking schema-equivalence.
+	 */
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof RowTypeInfo) {
+			final RowTypeInfo other = (RowTypeInfo) obj;
+			return other.canEqual(this) && super.equals(other);
+		} else {
+			return false;
+		}
 	}
 
 	@Override
@@ -274,6 +289,13 @@ public class RowTypeInfo extends TupleTypeInfoBase<Row> {
 		return types;
 	}
 
+	/**
+	 * Tests whether an other object describes the same, schema-equivalent row information.
+	 */
+	public boolean schemaEquals(Object obj) {
+		return equals(obj) && Arrays.equals(fieldNames, ((RowTypeInfo) obj).fieldNames);
+	}
+
 	private boolean hasDuplicateFieldNames(String[] fieldNames) {
 		HashSet<String> names = new HashSet<>();
 		for (String field : fieldNames) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java
index 03d1e04..f17ca95 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
 import org.apache.flink.api.common.typeutils.TypeInformationTestBase;
+
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -28,7 +29,8 @@ import java.util.List;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test for {@link RowTypeInfo}.
@@ -47,7 +49,10 @@ public class RowTypeInfoTest extends TypeInformationTestBase<RowTypeInfo> {
 		return new RowTypeInfo[] {
 			new RowTypeInfo(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO),
 			new RowTypeInfo(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO),
-			new RowTypeInfo(typeList)
+			new RowTypeInfo(typeList),
+			new RowTypeInfo(
+				new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO},
+				new String[]{"int", "int2"})
 		};
 	}
 
@@ -123,4 +128,24 @@ public class RowTypeInfoTest extends TypeInformationTestBase<RowTypeInfo> {
 		assertEquals("Short", typeInfo.getTypeAt("f1.f0").toString());
 	}
 
+	@Test
+	public void testSchemaEquals() {
+		final RowTypeInfo row1 = new RowTypeInfo(
+			new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO},
+			new String[] {"field1", "field2"});
+		final RowTypeInfo row2 = new RowTypeInfo(
+			new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO},
+			new String[] {"field1", "field2"});
+		assertTrue(row1.schemaEquals(row2));
+
+		final RowTypeInfo other1 = new RowTypeInfo(
+			new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO},
+			new String[] {"otherField", "field2"});
+		final RowTypeInfo other2 = new RowTypeInfo(
+			new TypeInformation[]{BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO},
+			new String[] {"field1", "field2"});
+		assertFalse(row1.schemaEquals(other1));
+		assertFalse(row1.schemaEquals(other2));
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java
index d68afd6..e28221f 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java
@@ -55,9 +55,9 @@ public class KryoWithCustomSerializersTest extends AbstractGenericTypeSerializer
 		TypeInformation<T> typeInfo = new GenericTypeInfo<T>(type);
 		return typeInfo.createSerializer(conf);
 	}
-	
+
 	public static final class LocalDateSerializer extends Serializer<LocalDate> implements java.io.Serializable {
-		
+
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -66,10 +66,10 @@ public class KryoWithCustomSerializersTest extends AbstractGenericTypeSerializer
 			output.writeInt(object.getMonthOfYear());
 			output.writeInt(object.getDayOfMonth());
 		}
-		
+
 		@Override
 		public LocalDate read(Kryo kryo, Input input, Class<LocalDate> type) {
 			return new LocalDate(input.readInt(), input.readInt(), input.readInt());
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/pom.xml
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/pom.xml b/flink-formats/flink-avro/pom.xml
index 2a437f6..dbf7fd0 100644
--- a/flink-formats/flink-avro/pom.xml
+++ b/flink-formats/flink-avro/pom.xml
@@ -52,6 +52,17 @@ under the License.
 		</dependency>
 
 		<dependency>
+			<groupId>joda-time</groupId>
+			<artifactId>joda-time</artifactId>
+			<!-- managed version -->
+			<scope>provided</scope>
+			<!-- Avro records can contain JodaTime fields when using logical fields.
+				In order to handle them, we need to add an optional dependency.
+				Users with those Avro records need to add this dependency themselves. -->
+			<optional>true</optional>
+		</dependency>
+
+		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<!-- use a dedicated Scala version to not depend on it -->
 			<artifactId>flink-table_2.11</artifactId>
@@ -97,6 +108,32 @@ under the License.
 		</dependency>
 	</dependencies>
 
+	<profiles>
+		<profile>
+			<!-- Create SQL Client uber jars for releases -->
+			<id>release</id>
+			<build>
+				<plugins>
+					<plugin>
+						<groupId>org.apache.maven.plugins</groupId>
+						<artifactId>maven-jar-plugin</artifactId>
+						<executions>
+							<execution>
+								<phase>package</phase>
+								<goals>
+									<goal>jar</goal>
+								</goals>
+								<configuration>
+									<classifier>sql-jar</classifier>
+								</configuration>
+							</execution>
+						</executions>
+					</plugin>
+				</plugins>
+			</build>
+		</profile>
+	</profiles>
+
 	<build>
 		<plugins>
 			<plugin>

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
index 276257a..c36a4be 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
@@ -17,116 +17,157 @@
 
 package org.apache.flink.formats.avro;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.avro.util.Utf8;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeFieldType;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
 
 /**
- * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}.
+ * Deserialization schema from Avro bytes to {@link Row}.
  *
- * <p>Deserializes the <code>byte[]</code> messages into (nested) Flink Rows.
+ * <p>Deserializes the <code>byte[]</code> messages into (nested) Flink rows. It converts Avro types
+ * into types that are compatible with Flink's Table & SQL API.
  *
- * {@link Utf8} is converted to regular Java Strings.
+ * <p>Projects with Avro records containing logical date/time types need to add a JodaTime
+ * dependency.
+ *
+ * <p>Note: Changes in this class need to be kept in sync with the corresponding runtime
+ * class {@link AvroRowSerializationSchema} and schema converter {@link AvroSchemaConverter}.
  */
+@PublicEvolving
 public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> {
 
 	/**
-	 * Avro record class.
+	 * Used for time conversions into SQL types.
+	 */
+	private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+	/**
+	 * Avro record class for deserialization. Might be null if record class is not available.
 	 */
 	private Class<? extends SpecificRecord> recordClazz;
 
 	/**
-	 * Schema for deterministic field order.
+	 * Schema string for deserialization.
+	 */
+	private String schemaString;
+
+	/**
+	 * Avro serialization schema.
 	 */
 	private transient Schema schema;
 
 	/**
-	 * Reader that deserializes byte array into a record.
+	 * Type information describing the result type.
 	 */
-	private transient DatumReader<SpecificRecord> datumReader;
+	private transient RowTypeInfo typeInfo;
 
 	/**
-	 * Input stream to read message from.
+	 * Record to deserialize byte array.
 	 */
-	private transient MutableByteArrayInputStream inputStream;
+	private transient IndexedRecord record;
 
 	/**
-	 * Avro decoder that decodes binary data.
+	 * Reader that deserializes byte array into a record.
 	 */
-	private transient Decoder decoder;
+	private transient DatumReader<IndexedRecord> datumReader;
 
 	/**
-	 * Record to deserialize byte array to.
+	 * Input stream to read message from.
 	 */
-	private SpecificRecord record;
+	private transient MutableByteArrayInputStream inputStream;
 
 	/**
-	 * Type information describing the result type.
+	 * Avro decoder that decodes binary data.
 	 */
-	private transient TypeInformation<Row> typeInfo;
+	private transient Decoder decoder;
 
 	/**
-	 * Creates a Avro deserialization schema for the given record.
+	 * Creates a Avro deserialization schema for the given specific record class. Having the
+	 * concrete Avro record class might improve performance.
 	 *
 	 * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row
 	 */
-	public AvroRowDeserializationSchema(Class<? extends SpecificRecordBase> recordClazz) {
+	public AvroRowDeserializationSchema(Class<? extends SpecificRecord> recordClazz) {
 		Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
 		this.recordClazz = recordClazz;
-		this.schema = SpecificData.get().getSchema(recordClazz);
-		this.datumReader = new SpecificDatumReader<>(schema);
-		this.record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema);
-		this.inputStream = new MutableByteArrayInputStream();
-		this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
-		this.typeInfo = AvroRecordClassConverter.convert(recordClazz);
+		schema = SpecificData.get().getSchema(recordClazz);
+		typeInfo = (RowTypeInfo) AvroSchemaConverter.convertToTypeInfo(recordClazz);
+		schemaString = schema.toString();
+		record = (IndexedRecord) SpecificData.newInstance(recordClazz, schema);
+		datumReader = new SpecificDatumReader<>(schema);
+		inputStream = new MutableByteArrayInputStream();
+		decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
+	}
+
+	/**
+	 * Creates a Avro deserialization schema for the given Avro schema string.
+	 *
+	 * @param avroSchemaString Avro schema string to deserialize Avro's record to Flink's row
+	 */
+	public AvroRowDeserializationSchema(String avroSchemaString) {
+		Preconditions.checkNotNull(avroSchemaString, "Avro schema must not be null.");
+		recordClazz = null;
+		final TypeInformation<?> typeInfo = AvroSchemaConverter.convertToTypeInfo(avroSchemaString);
+		Preconditions.checkArgument(typeInfo instanceof RowTypeInfo, "Row type information expected.");
+		this.typeInfo = (RowTypeInfo) typeInfo;
+		schemaString = avroSchemaString;
+		schema = new Schema.Parser().parse(avroSchemaString);
+		record = new GenericData.Record(schema);
+		datumReader = new GenericDatumReader<>(schema);
+		inputStream = new MutableByteArrayInputStream();
+		decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
 	}
 
 	@Override
 	public Row deserialize(byte[] message) throws IOException {
-		// read record
 		try {
 			inputStream.setBuffer(message);
-			this.record = datumReader.read(record, decoder);
-		} catch (IOException e) {
-			throw new RuntimeException("Failed to deserialize Row.", e);
+			record = datumReader.read(record, decoder);
+			return convertAvroRecordToRow(schema, typeInfo, record);
+		} catch (Exception e) {
+			throw new IOException("Failed to deserialize Avro record.", e);
 		}
-
-		// convert to row
-		final Object row = convertToRow(schema, record);
-		return (Row) row;
-	}
-
-	private void writeObject(ObjectOutputStream oos) throws IOException {
-		oos.writeObject(recordClazz);
-	}
-
-	@SuppressWarnings("unchecked")
-	private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException {
-		this.recordClazz = (Class<? extends SpecificRecord>) ois.readObject();
-		this.schema = SpecificData.get().getSchema(recordClazz);
-		this.datumReader = new SpecificDatumReader<>(schema);
-		this.record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema);
-		this.inputStream = new MutableByteArrayInputStream();
-		this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
 	}
 
 	@Override
@@ -134,37 +175,175 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 		return typeInfo;
 	}
 
-	/**
-	 * Converts a (nested) Avro {@link SpecificRecord} into Flink's Row type.
-	 * Avro's {@link Utf8} fields are converted into regular Java strings.
-	 */
-	private static Object convertToRow(Schema schema, Object recordObj) {
-		if (recordObj instanceof GenericRecord) {
-			// records can be wrapped in a union
-			if (schema.getType() == Schema.Type.UNION) {
+	// --------------------------------------------------------------------------------------------
+
+	private Row convertAvroRecordToRow(Schema schema, RowTypeInfo typeInfo, IndexedRecord record) {
+		final List<Schema.Field> fields = schema.getFields();
+		final TypeInformation<?>[] fieldInfo = typeInfo.getFieldTypes();
+		final int length = fields.size();
+		final Row row = new Row(length);
+		for (int i = 0; i < length; i++) {
+			final Schema.Field field = fields.get(i);
+			row.setField(i, convertAvroType(field.schema(), fieldInfo[i], record.get(i)));
+		}
+		return row;
+	}
+
+	private Object convertAvroType(Schema schema, TypeInformation<?> info, Object object) {
+		// we perform the conversion based on schema information but enriched with pre-computed
+		// type information where useful (i.e., for arrays)
+
+		if (object == null) {
+			return null;
+		}
+		switch (schema.getType()) {
+			case RECORD:
+				if (object instanceof IndexedRecord) {
+					return convertAvroRecordToRow(schema, (RowTypeInfo) info, (IndexedRecord) object);
+				}
+				throw new IllegalStateException("IndexedRecord expected but was: " + object.getClass());
+			case ENUM:
+			case STRING:
+				return object.toString();
+			case ARRAY:
+				if (info instanceof BasicArrayTypeInfo) {
+					final TypeInformation<?> elementInfo = ((BasicArrayTypeInfo<?, ?>) info).getComponentInfo();
+					return convertToObjectArray(schema.getElementType(), elementInfo, object);
+				} else {
+					final TypeInformation<?> elementInfo = ((ObjectArrayTypeInfo<?, ?>) info).getComponentInfo();
+					return convertToObjectArray(schema.getElementType(), elementInfo, object);
+				}
+			case MAP:
+				final MapTypeInfo<?, ?> mapTypeInfo = (MapTypeInfo<?, ?>) info;
+				final Map<String, Object> convertedMap = new HashMap<>();
+				final Map<?, ?> map = (Map<?, ?>) object;
+				for (Map.Entry<?, ?> entry : map.entrySet()) {
+					convertedMap.put(
+						entry.getKey().toString(),
+						convertAvroType(schema.getValueType(), mapTypeInfo.getValueTypeInfo(), entry.getValue()));
+				}
+				return convertedMap;
+			case UNION:
 				final List<Schema> types = schema.getTypes();
-				if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
-					schema = types.get(1);
+				final int size = types.size();
+				final Schema actualSchema;
+				if (size == 2 && types.get(0).getType() == Schema.Type.NULL) {
+					return convertAvroType(types.get(1), info, object);
+				} else if (size == 2 && types.get(1).getType() == Schema.Type.NULL) {
+					return convertAvroType(types.get(0), info, object);
+				} else if (size == 1) {
+					return convertAvroType(types.get(0), info, object);
+				} else {
+					// generic type
+					return object;
+				}
+			case FIXED:
+				final byte[] fixedBytes = ((GenericFixed) object).bytes();
+				if (info == Types.BIG_DEC) {
+					return convertToDecimal(schema, fixedBytes);
 				}
-				else {
-					throw new RuntimeException("Currently we only support schemas of the following form: UNION[null, RECORD]. Given: " + schema);
+				return fixedBytes;
+			case BYTES:
+				final ByteBuffer byteBuffer = (ByteBuffer) object;
+				final byte[] bytes = new byte[byteBuffer.remaining()];
+				byteBuffer.get(bytes);
+				if (info == Types.BIG_DEC) {
+					return convertToDecimal(schema, bytes);
 				}
-			} else if (schema.getType() != Schema.Type.RECORD) {
-				throw new RuntimeException("Record type for row type expected. But is: " + schema);
-			}
-			final List<Schema.Field> fields = schema.getFields();
-			final Row row = new Row(fields.size());
-			final GenericRecord record = (GenericRecord) recordObj;
-			for (int i = 0; i < fields.size(); i++) {
-				final Schema.Field field = fields.get(i);
-				row.setField(i, convertToRow(field.schema(), record.get(field.pos())));
-			}
-			return row;
-		} else if (recordObj instanceof Utf8) {
-			return recordObj.toString();
+				return bytes;
+			case INT:
+				if (info == Types.SQL_DATE) {
+					return convertToDate(object);
+				} else if (info == Types.SQL_TIME) {
+					return convertToTime(object);
+				}
+				return object;
+			case LONG:
+				if (info == Types.SQL_TIMESTAMP) {
+					return convertToTimestamp(object);
+				}
+				return object;
+			case FLOAT:
+			case DOUBLE:
+			case BOOLEAN:
+				return object;
+		}
+		throw new RuntimeException("Unsupported Avro type:" + schema);
+	}
+
+	private BigDecimal convertToDecimal(Schema schema, byte[] bytes) {
+		final LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) schema.getLogicalType();
+		return new BigDecimal(new BigInteger(bytes), decimalType.getScale());
+	}
+
+	private Date convertToDate(Object object) {
+		final long millis;
+		if (object instanceof Integer) {
+			final Integer value = (Integer) object;
+			// adopted from Apache Calcite
+			final long t = (long) value * 86400000L;
+			millis = t - (long) LOCAL_TZ.getOffset(t);
+		} else {
+			// use 'provided' Joda time
+			final LocalDate value = (LocalDate) object;
+			millis = value.toDate().getTime();
+		}
+		return new Date(millis);
+	}
+
+	private Time convertToTime(Object object) {
+		final long millis;
+		if (object instanceof Integer) {
+			millis = (Integer) object;
 		} else {
-			return recordObj;
+			// use 'provided' Joda time
+			final LocalTime value = (LocalTime) object;
+			millis = (long) value.get(DateTimeFieldType.millisOfDay());
 		}
+		return new Time(millis - LOCAL_TZ.getOffset(millis));
 	}
 
+	private Timestamp convertToTimestamp(Object object) {
+		final long millis;
+		if (object instanceof Long) {
+			millis = (Long) object;
+		} else {
+			// use 'provided' Joda time
+			final DateTime value = (DateTime) object;
+			millis = value.toDate().getTime();
+		}
+		return new Timestamp(millis - LOCAL_TZ.getOffset(millis));
+	}
+
+	private Object[] convertToObjectArray(Schema elementSchema, TypeInformation<?> elementInfo, Object object) {
+		final List<?> list = (List<?>) object;
+		final Object[] convertedArray = (Object[]) Array.newInstance(
+			elementInfo.getTypeClass(),
+			list.size());
+		for (int i = 0; i < list.size(); i++) {
+			convertedArray[i] = convertAvroType(elementSchema, elementInfo, list.get(i));
+		}
+		return convertedArray;
+	}
+
+	private void writeObject(ObjectOutputStream outputStream) throws IOException {
+		outputStream.writeObject(recordClazz);
+		outputStream.writeUTF(schemaString);
+	}
+
+	@SuppressWarnings("unchecked")
+	private void readObject(ObjectInputStream inputStream) throws ClassNotFoundException, IOException {
+		recordClazz = (Class<? extends SpecificRecord>) inputStream.readObject();
+		schemaString = inputStream.readUTF();
+		typeInfo = (RowTypeInfo) AvroSchemaConverter.<Row>convertToTypeInfo(schemaString);
+		schema = new Schema.Parser().parse(schemaString);
+		if (recordClazz != null) {
+			record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema);
+		} else {
+			record = new GenericData.Record(schema);
+		}
+		datumReader = new SpecificDatumReader<>(schema);
+		this.inputStream = new MutableByteArrayInputStream();
+		decoder = DecoderFactory.get().binaryDecoder(this.inputStream, null);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
index 41000a6..80f5f1d 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
@@ -19,12 +19,18 @@
 package org.apache.flink.formats.avro;
 
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
+import org.apache.avro.SchemaParseException;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.io.Encoder;
 import org.apache.avro.io.EncoderFactory;
@@ -37,19 +43,43 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
 
 /**
- * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes.
+ * Serialization schema that serializes {@link Row} into Avro bytes.
+ *
+ * <p>Serializes objects that are represented in (nested) Flink rows. It support types that
+ * are compatible with Flink's Table & SQL API.
+ *
+ * <p>Note: Changes in this class need to be kept in sync with the corresponding runtime
+ * class {@link AvroRowDeserializationSchema} and schema converter {@link AvroSchemaConverter}.
  */
 public class AvroRowSerializationSchema implements SerializationSchema<Row> {
 
 	/**
-	 * Avro record class.
+	 * Used for time conversions from SQL types.
+	 */
+	private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+	/**
+	 * Avro record class for serialization. Might be null if record class is not available.
 	 */
 	private Class<? extends SpecificRecord> recordClazz;
 
 	/**
+	 * Schema string for deserialization.
+	 */
+	private String schemaString;
+
+	/**
 	 * Avro serialization schema.
 	 */
 	private transient Schema schema;
@@ -57,93 +87,226 @@ public class AvroRowSerializationSchema implements SerializationSchema<Row> {
 	/**
 	 * Writer to serialize Avro record into a byte array.
 	 */
-	private transient DatumWriter<GenericRecord> datumWriter;
+	private transient DatumWriter<IndexedRecord> datumWriter;
 
 	/**
 	 * Output stream to serialize records into byte array.
 	 */
-	private transient ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
+	private transient ByteArrayOutputStream arrayOutputStream;
 
 	/**
 	 * Low-level class for serialization of Avro values.
 	 */
-	private transient Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+	private transient Encoder encoder;
 
 	/**
-	 * Creates a Avro serialization schema for the given schema.
+	 * Creates an Avro serialization schema for the given specific record class.
 	 *
-	 * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row
+	 * @param recordClazz Avro record class used to serialize Flink's row to Avro's record
 	 */
 	public AvroRowSerializationSchema(Class<? extends SpecificRecord> recordClazz) {
 		Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
 		this.recordClazz = recordClazz;
 		this.schema = SpecificData.get().getSchema(recordClazz);
+		this.schemaString = schema.toString();
 		this.datumWriter = new SpecificDatumWriter<>(schema);
+		this.arrayOutputStream = new ByteArrayOutputStream();
+		this.encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+	}
+
+	/**
+	 * Creates an Avro serialization schema for the given Avro schema string.
+	 *
+	 * @param avroSchemaString Avro schema string used to serialize Flink's row to Avro's record
+	 */
+	public AvroRowSerializationSchema(String avroSchemaString) {
+		Preconditions.checkNotNull(avroSchemaString, "Avro schema must not be null.");
+		this.recordClazz = null;
+		this.schemaString = avroSchemaString;
+		try {
+			this.schema = new Schema.Parser().parse(avroSchemaString);
+		} catch (SchemaParseException e) {
+			throw new IllegalArgumentException("Could not parse Avro schema string.", e);
+		}
+		this.datumWriter = new GenericDatumWriter<>(schema);
+		this.arrayOutputStream = new ByteArrayOutputStream();
+		this.encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
 	}
 
 	@Override
-	@SuppressWarnings("unchecked")
 	public byte[] serialize(Row row) {
-		// convert to record
-		final Object record = convertToRecord(schema, row);
-
-		// write
 		try {
+			// convert to record
+			final GenericRecord record = convertRowToAvroRecord(schema, row);
 			arrayOutputStream.reset();
-			datumWriter.write((GenericRecord) record, encoder);
+			datumWriter.write(record, encoder);
 			encoder.flush();
 			return arrayOutputStream.toByteArray();
-		} catch (IOException e) {
-			throw new RuntimeException("Failed to serialize Row.", e);
+		} catch (Exception e) {
+			throw new RuntimeException("Failed to serialize row.", e);
 		}
 	}
 
-	private void writeObject(ObjectOutputStream oos) throws IOException {
-		oos.writeObject(recordClazz);
-	}
+	// --------------------------------------------------------------------------------------------
 
-	@SuppressWarnings("unchecked")
-	private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException {
-		this.recordClazz = (Class<? extends SpecificRecord>) ois.readObject();
-		this.schema = SpecificData.get().getSchema(recordClazz);
-		this.datumWriter = new SpecificDatumWriter<>(schema);
-		this.arrayOutputStream = new ByteArrayOutputStream();
-		this.encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+	private GenericRecord convertRowToAvroRecord(Schema schema, Row row) {
+		final List<Schema.Field> fields = schema.getFields();
+		final int length = fields.size();
+		final GenericRecord record = new GenericData.Record(schema);
+		for (int i = 0; i < length; i++) {
+			final Schema.Field field = fields.get(i);
+			record.put(i, convertFlinkType(field.schema(), row.getField(i)));
+		}
+		return record;
 	}
 
-	/**
-	 * Converts a (nested) Flink Row into Avro's {@link GenericRecord}.
-	 * Strings are converted into Avro's {@link Utf8} fields.
-	 */
-	private static Object convertToRecord(Schema schema, Object rowObj) {
-		if (rowObj instanceof Row) {
-			// records can be wrapped in a union
-			if (schema.getType() == Schema.Type.UNION) {
+	private Object convertFlinkType(Schema schema, Object object) {
+		if (object == null) {
+			return null;
+		}
+		switch (schema.getType()) {
+			case RECORD:
+				if (object instanceof Row) {
+					return convertRowToAvroRecord(schema, (Row) object);
+				}
+				throw new IllegalStateException("Row expected but was: " + object.getClass());
+			case ENUM:
+				return new GenericData.EnumSymbol(schema, object.toString());
+			case ARRAY:
+				final Schema elementSchema = schema.getElementType();
+				final Object[] array = (Object[]) object;
+				final GenericData.Array<Object> convertedArray = new GenericData.Array<>(array.length, schema);
+				for (Object element : array) {
+					convertedArray.add(convertFlinkType(elementSchema, element));
+				}
+				return convertedArray;
+			case MAP:
+				final Map<?, ?> map = (Map<?, ?>) object;
+				final Map<Utf8, Object> convertedMap = new HashMap<>();
+				for (Map.Entry<?, ?> entry : map.entrySet()) {
+					convertedMap.put(
+						new Utf8(entry.getKey().toString()),
+						convertFlinkType(schema.getValueType(), entry.getValue()));
+				}
+				return convertedMap;
+			case UNION:
 				final List<Schema> types = schema.getTypes();
-				if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
-					schema = types.get(1);
+				final int size = types.size();
+				final Schema actualSchema;
+				if (size == 2 && types.get(0).getType() == Schema.Type.NULL) {
+					actualSchema = types.get(1);
+				} else if (size == 2 && types.get(1).getType() == Schema.Type.NULL) {
+					actualSchema = types.get(0);
+				} else if (size == 1) {
+					actualSchema = types.get(0);
+				} else {
+					// generic type
+					return object;
+				}
+				return convertFlinkType(actualSchema, object);
+			case FIXED:
+				// check for logical type
+				if (object instanceof BigDecimal) {
+					return new GenericData.Fixed(
+						schema,
+						convertFromDecimal(schema, (BigDecimal) object));
 				}
-				else if (types.size() == 2 && types.get(0).getType() == Schema.Type.RECORD && types.get(1).getType() == Schema.Type.NULL) {
-					schema = types.get(0);
+				return new GenericData.Fixed(schema, (byte[]) object);
+			case STRING:
+				return new Utf8(object.toString());
+			case BYTES:
+				// check for logical type
+				if (object instanceof BigDecimal) {
+					return ByteBuffer.wrap(convertFromDecimal(schema, (BigDecimal) object));
 				}
-				else {
-					throw new RuntimeException("Currently we only support schemas of the following form: UNION[null, RECORD] or UNION[RECORD, NULL] Given: " + schema);
+				return ByteBuffer.wrap((byte[]) object);
+			case INT:
+				// check for logical types
+				if (object instanceof Date) {
+					return convertFromDate(schema, (Date) object);
+				} else if (object instanceof Time) {
+					return convertFromTime(schema, (Time) object);
 				}
-			} else if (schema.getType() != Schema.Type.RECORD) {
-				throw new RuntimeException("Record type for row type expected. But is: " + schema);
-			}
-			final List<Schema.Field> fields = schema.getFields();
-			final GenericRecord record = new GenericData.Record(schema);
-			final Row row = (Row) rowObj;
-			for (int i = 0; i < fields.size(); i++) {
-				final Schema.Field field = fields.get(i);
-				record.put(field.pos(), convertToRecord(field.schema(), row.getField(i)));
-			}
-			return record;
-		} else if (rowObj instanceof String) {
-			return new Utf8((String) rowObj);
+				return object;
+			case LONG:
+				// check for logical type
+				if (object instanceof Timestamp) {
+					return convertFromTimestamp(schema, (Timestamp) object);
+				}
+				return object;
+			case FLOAT:
+			case DOUBLE:
+			case BOOLEAN:
+				return object;
+		}
+		throw new RuntimeException("Unsupported Avro type:" + schema);
+	}
+
+	private byte[] convertFromDecimal(Schema schema, BigDecimal decimal) {
+		final LogicalType logicalType = schema.getLogicalType();
+		if (logicalType instanceof LogicalTypes.Decimal) {
+			final LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType;
+			// rescale to target type
+			final BigDecimal rescaled = decimal.setScale(decimalType.getScale(), BigDecimal.ROUND_UNNECESSARY);
+			// byte array must contain the two's-complement representation of the
+			// unscaled integer value in big-endian byte order
+			return decimal.unscaledValue().toByteArray();
+		} else {
+			throw new RuntimeException("Unsupported decimal type.");
+		}
+	}
+
+	private int convertFromDate(Schema schema, Date date) {
+		final LogicalType logicalType = schema.getLogicalType();
+		if (logicalType == LogicalTypes.date()) {
+			// adopted from Apache Calcite
+			final long time = date.getTime();
+			final long converted = time + (long) LOCAL_TZ.getOffset(time);
+			return (int) (converted / 86400000L);
+		} else {
+			throw new RuntimeException("Unsupported date type.");
+		}
+	}
+
+	private int convertFromTime(Schema schema, Time date) {
+		final LogicalType logicalType = schema.getLogicalType();
+		if (logicalType == LogicalTypes.timeMillis()) {
+			// adopted from Apache Calcite
+			final long time = date.getTime();
+			final long converted = time + (long) LOCAL_TZ.getOffset(time);
+			return (int) (converted % 86400000L);
+		} else {
+			throw new RuntimeException("Unsupported time type.");
+		}
+	}
+
+	private long convertFromTimestamp(Schema schema, Timestamp date) {
+		final LogicalType logicalType = schema.getLogicalType();
+		if (logicalType == LogicalTypes.timestampMillis()) {
+			// adopted from Apache Calcite
+			final long time = date.getTime();
+			return time + (long) LOCAL_TZ.getOffset(time);
+		} else {
+			throw new RuntimeException("Unsupported timestamp type.");
+		}
+	}
+
+	private void writeObject(ObjectOutputStream outputStream) throws IOException {
+		outputStream.writeObject(recordClazz);
+		outputStream.writeObject(schemaString); // support for null
+	}
+
+	@SuppressWarnings("unchecked")
+	private void readObject(ObjectInputStream inputStream) throws ClassNotFoundException, IOException {
+		recordClazz = (Class<? extends SpecificRecord>) inputStream.readObject();
+		schemaString = (String) inputStream.readObject();
+		if (recordClazz != null) {
+			schema = SpecificData.get().getSchema(recordClazz);
 		} else {
-			return rowObj;
+			schema = new Schema.Parser().parse(schemaString);
 		}
+		datumWriter = new SpecificDatumWriter<>(schema);
+		arrayOutputStream = new ByteArrayOutputStream();
+		encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java
deleted file mode 100644
index b7b4871..0000000
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.formats.avro.typeutils;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.types.Row;
-
-import org.apache.avro.Schema;
-import org.apache.avro.specific.SpecificData;
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.avro.util.Utf8;
-
-import java.util.List;
-
-/**
- * Utilities for Avro record class conversion.
- */
-public class AvroRecordClassConverter {
-
-	private AvroRecordClassConverter() {
-		// private
-	}
-
-	/**
-	 * Converts the extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order.
-	 * Replaces generic Utf8 with basic String type information.
-	 */
-	@SuppressWarnings("unchecked")
-	public static <T extends SpecificRecordBase> TypeInformation<Row> convert(Class<T> avroClass) {
-		final AvroTypeInfo<T> avroTypeInfo = new AvroTypeInfo<>(avroClass);
-		// determine schema to retrieve deterministic field order
-		final Schema schema = SpecificData.get().getSchema(avroClass);
-		return (TypeInformation<Row>) convertType(avroTypeInfo, schema);
-	}
-
-	/**
-	 * Recursively converts extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order.
-	 * Replaces generic Utf8 with basic String type information.
-	 */
-	private static TypeInformation<?> convertType(TypeInformation<?> extracted, Schema schema) {
-		if (schema.getType() == Schema.Type.RECORD) {
-			final List<Schema.Field> fields = schema.getFields();
-			final AvroTypeInfo<?> avroTypeInfo = (AvroTypeInfo<?>) extracted;
-
-			final TypeInformation<?>[] types = new TypeInformation<?>[fields.size()];
-			final String[] names = new String[fields.size()];
-			for (int i = 0; i < fields.size(); i++) {
-				final Schema.Field field = fields.get(i);
-				types[i] = convertType(avroTypeInfo.getTypeAt(field.name()), field.schema());
-				names[i] = field.name();
-			}
-			return new RowTypeInfo(types, names);
-		} else if (extracted instanceof GenericTypeInfo<?>) {
-			final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted;
-			if (genericTypeInfo.getTypeClass() == Utf8.class) {
-				return BasicTypeInfo.STRING_TYPE_INFO;
-			}
-		}
-		return extracted;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
new file mode 100644
index 0000000..6e49df2
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.AvroRowDeserializationSchema;
+import org.apache.flink.formats.avro.AvroRowSerializationSchema;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaParseException;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+
+import java.util.List;
+
+/**
+ * Converts an Avro schema into Flink's type information. It uses {@link RowTypeInfo} for representing
+ * objects and converts Avro types into types that are compatible with Flink's Table & SQL API.
+ *
+ * <p>Note: Changes in this class need to be kept in sync with the corresponding runtime
+ * classes {@link AvroRowDeserializationSchema} and {@link AvroRowSerializationSchema}.
+ */
+public class AvroSchemaConverter {
+
+	private AvroSchemaConverter() {
+		// private
+	}
+
+	/**
+	 * Converts an Avro class into a nested row structure with deterministic field order and data
+	 * types that are compatible with Flink's Table & SQL API.
+	 *
+	 * @param avroClass Avro specific record that contains schema information
+	 * @return type information matching the schema
+	 */
+	@SuppressWarnings("unchecked")
+	public static <T extends SpecificRecord> TypeInformation<Row> convertToTypeInfo(Class<T> avroClass) {
+		Preconditions.checkNotNull(avroClass, "Avro specific record class must not be null.");
+		// determine schema to retrieve deterministic field order
+		final Schema schema = SpecificData.get().getSchema(avroClass);
+		return (TypeInformation<Row>) convertToTypeInfo(schema);
+	}
+
+	/**
+	 * Converts an Avro schema string into a nested row structure with deterministic field order and data
+	 * types that are compatible with Flink's Table & SQL API.
+	 *
+	 * @param avroSchemaString Avro schema definition string
+	 * @return type information matching the schema
+	 */
+	@SuppressWarnings("unchecked")
+	public static <T> TypeInformation<T> convertToTypeInfo(String avroSchemaString) {
+		Preconditions.checkNotNull(avroSchemaString, "Avro schema must not be null.");
+		final Schema schema;
+		try {
+			schema = new Schema.Parser().parse(avroSchemaString);
+		} catch (SchemaParseException e) {
+			throw new IllegalArgumentException("Could not parse Avro schema string.", e);
+		}
+		return (TypeInformation<T>) convertToTypeInfo(schema);
+	}
+
+	private static TypeInformation<?> convertToTypeInfo(Schema schema) {
+		switch (schema.getType()) {
+			case RECORD:
+				final List<Schema.Field> fields = schema.getFields();
+
+				final TypeInformation<?>[] types = new TypeInformation<?>[fields.size()];
+				final String[] names = new String[fields.size()];
+				for (int i = 0; i < fields.size(); i++) {
+					final Schema.Field field = fields.get(i);
+					types[i] = convertToTypeInfo(field.schema());
+					names[i] = field.name();
+				}
+				return Types.ROW_NAMED(names, types);
+			case ENUM:
+				return Types.STRING;
+			case ARRAY:
+				// result type might either be ObjectArrayTypeInfo or BasicArrayTypeInfo for Strings
+				return Types.OBJECT_ARRAY(convertToTypeInfo(schema.getElementType()));
+			case MAP:
+				return Types.MAP(Types.STRING, convertToTypeInfo(schema.getValueType()));
+			case UNION:
+				final Schema actualSchema;
+				if (schema.getTypes().size() == 2 && schema.getTypes().get(0).getType() == Schema.Type.NULL) {
+					actualSchema = schema.getTypes().get(1);
+				} else if (schema.getTypes().size() == 2 && schema.getTypes().get(1).getType() == Schema.Type.NULL) {
+					actualSchema = schema.getTypes().get(0);
+				} else if (schema.getTypes().size() == 1) {
+					actualSchema = schema.getTypes().get(0);
+				} else {
+					// use Kryo for serialization
+					return Types.GENERIC(Object.class);
+				}
+				return convertToTypeInfo(actualSchema);
+			case FIXED:
+				// logical decimal type
+				if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
+					return Types.BIG_DEC;
+				}
+				// convert fixed size binary data to primitive byte arrays
+				return Types.PRIMITIVE_ARRAY(Types.BYTE);
+			case STRING:
+				// convert Avro's Utf8/CharSequence to String
+				return Types.STRING;
+			case BYTES:
+				// logical decimal type
+				if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
+					return Types.BIG_DEC;
+				}
+				return Types.PRIMITIVE_ARRAY(Types.BYTE);
+			case INT:
+				// logical date and time type
+				final LogicalType logicalType = schema.getLogicalType();
+				if (logicalType == LogicalTypes.date()) {
+					return Types.SQL_DATE;
+				} else if (logicalType == LogicalTypes.timeMillis()) {
+					return Types.SQL_TIME;
+				}
+				return Types.INT;
+			case LONG:
+				// logical timestamp type
+				if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
+					return Types.SQL_TIMESTAMP;
+				}
+				return Types.LONG;
+			case FLOAT:
+				return Types.FLOAT;
+			case DOUBLE:
+				return Types.DOUBLE;
+			case BOOLEAN:
+				return Types.BOOLEAN;
+			case NULL:
+				return Types.VOID;
+		}
+		throw new IllegalArgumentException("Unsupported Avro type '" + schema.getType() + "'.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
index 5744abc..b871dbc 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
@@ -33,6 +33,11 @@ import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
+import org.joda.time.Chronology;
+import org.joda.time.DateTimeZone;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
+import org.joda.time.chrono.ISOChronology;
 
 import java.io.Serializable;
 import java.util.LinkedHashMap;
@@ -103,4 +108,69 @@ public class AvroKryoSerializerUtils extends AvroUtils {
 			return sParser.parse(schemaAsString);
 		}
 	}
+
+	/**
+	 * Avro logical types use JodaTime's LocalDate but Kryo is unable to serialize it
+	 * properly (esp. visible after calling the toString() method).
+	 */
+	public static class JodaLocalDateSerializer extends Serializer<LocalDate> {
+
+		public JodaLocalDateSerializer() {
+			setImmutable(true);
+		}
+
+		@Override
+		public void write(Kryo kryo, Output output, LocalDate localDate) {
+			output.writeInt(localDate.getYear());
+			output.writeInt(localDate.getMonthOfYear());
+			output.writeInt(localDate.getDayOfMonth());
+
+			final Chronology chronology = localDate.getChronology();
+			if (chronology != null && chronology != ISOChronology.getInstanceUTC()) {
+				throw new RuntimeException("Unsupported chronology: " + chronology);
+			}
+		}
+
+		@Override
+		public LocalDate read(Kryo kryo, Input input, Class<LocalDate> aClass) {
+			final int y = input.readInt();
+			final int m = input.readInt();
+			final int d = input.readInt();
+
+			return new LocalDate(
+				y,
+				m,
+				d,
+				null);
+		}
+	}
+
+	/**
+	 * Avro logical types use JodaTime's LocalTime but Kryo is unable to serialize it
+	 * properly (esp. visible after calling the toString() method).
+	 */
+	public static class JodaLocalTimeSerializer extends Serializer<LocalTime> {
+
+		@Override
+		public void write(Kryo kryo, Output output, LocalTime object) {
+			final int time = object.getMillisOfDay();
+			output.writeInt(time, true);
+
+			final Chronology chronology = object.getChronology();
+			if (chronology != null && chronology != ISOChronology.getInstanceUTC()) {
+				throw new RuntimeException("Unsupported chronology: " + chronology);
+			}
+		}
+
+		@Override
+		public LocalTime read(Kryo kryo, Input input, Class<LocalTime> type) {
+			final int time = input.readInt(true);
+			return new LocalTime(time, ISOChronology.getInstanceUTC().withZone(DateTimeZone.UTC));
+		}
+
+		@Override
+		public LocalTime copy(Kryo kryo, LocalTime original) {
+			return new LocalTime(original);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/main/java/org/apache/flink/table/descriptors/Avro.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/table/descriptors/Avro.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/table/descriptors/Avro.java
index f07a22f..611d714 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/table/descriptors/Avro.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/table/descriptors/Avro.java
@@ -20,14 +20,15 @@ package org.apache.flink.table.descriptors;
 
 import org.apache.flink.util.Preconditions;
 
-import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.specific.SpecificRecord;
 
 /**
  * Format descriptor for Apache Avro records.
  */
 public class Avro extends FormatDescriptor {
 
-	private Class<? extends SpecificRecordBase> recordClass;
+	private Class<? extends SpecificRecord> recordClass;
+	private String avroSchema;
 
 	/**
 	 * Format descriptor for Apache Avro records.
@@ -37,17 +38,28 @@ public class Avro extends FormatDescriptor {
 	}
 
 	/**
-	 * Sets the class of the Avro specific record. Required.
+	 * Sets the class of the Avro specific record.
 	 *
 	 * @param recordClass class of the Avro record.
 	 */
-	public Avro recordClass(Class<? extends SpecificRecordBase> recordClass) {
+	public Avro recordClass(Class<? extends SpecificRecord> recordClass) {
 		Preconditions.checkNotNull(recordClass);
 		this.recordClass = recordClass;
 		return this;
 	}
 
 	/**
+	 * Sets the Avro schema for specific or generic Avro records.
+	 *
+	 * @param avroSchema Avro schema string
+	 */
+	public Avro avroSchema(String avroSchema) {
+		Preconditions.checkNotNull(avroSchema);
+		this.avroSchema = avroSchema;
+		return this;
+	}
+
+	/**
 	 * Internal method for format properties conversion.
 	 */
 	@Override
@@ -55,5 +67,8 @@ public class Avro extends FormatDescriptor {
 		if (null != recordClass) {
 			properties.putClass(AvroValidator.FORMAT_RECORD_CLASS, recordClass);
 		}
+		if (null != avroSchema) {
+			properties.putString(AvroValidator.FORMAT_AVRO_SCHEMA, avroSchema);
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/main/java/org/apache/flink/table/descriptors/AvroValidator.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/table/descriptors/AvroValidator.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/table/descriptors/AvroValidator.java
index 8a72abf..c66dcc7 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/table/descriptors/AvroValidator.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/table/descriptors/AvroValidator.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.descriptors;
 
+import org.apache.flink.table.api.ValidationException;
+
 /**
  * Validator for {@link Avro}.
  */
@@ -25,10 +27,21 @@ public class AvroValidator extends FormatDescriptorValidator {
 
 	public static final String FORMAT_TYPE_VALUE = "avro";
 	public static final String FORMAT_RECORD_CLASS = "format.record-class";
+	public static final String FORMAT_AVRO_SCHEMA = "format.avro-schema";
 
 	@Override
 	public void validate(DescriptorProperties properties) {
 		super.validate(properties);
-		properties.validateString(FORMAT_RECORD_CLASS, false, 1);
+		final boolean hasRecordClass = properties.containsKey(FORMAT_RECORD_CLASS);
+		final boolean hasAvroSchema = properties.containsKey(FORMAT_AVRO_SCHEMA);
+		if (hasRecordClass && hasAvroSchema) {
+			throw new ValidationException("A definition of both a schema and Avro schema is not allowed.");
+		} else if (hasRecordClass) {
+			properties.validateString(FORMAT_RECORD_CLASS, false, 1);
+		} else if (hasAvroSchema) {
+			properties.validateString(FORMAT_AVRO_SCHEMA, false, 1);
+		} else {
+			throw new ValidationException("A definition of an Avro specific record class or Avro schema is required.");
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
index caa6e0d..dd901d0 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.formats.avro.AvroOutputFormat.Codec;
 import org.apache.flink.formats.avro.generated.Colors;
+import org.apache.flink.formats.avro.generated.Fixed2;
 import org.apache.flink.formats.avro.generated.User;
 import org.apache.flink.test.util.JavaProgramTestBase;
 
@@ -31,12 +32,18 @@ import org.apache.avro.file.DataFileReader;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.specific.SpecificDatumReader;
+import org.joda.time.DateTime;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 import org.junit.Assert;
 
 import java.io.File;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 
 /**
  * IT cases for the {@link AvroOutputFormat}.
@@ -72,14 +79,14 @@ public class AvroOutputFormatITCase extends JavaProgramTestBase {
 
 		//output the data with AvroOutputFormat for specific user type
 		DataSet<User> specificUser = input.map(new ConvertToUser());
-		AvroOutputFormat<User> avroOutputFormat = new AvroOutputFormat<User>(User.class);
+		AvroOutputFormat<User> avroOutputFormat = new AvroOutputFormat<>(User.class);
 		avroOutputFormat.setCodec(Codec.SNAPPY); // FLINK-4771: use a codec
 		avroOutputFormat.setSchema(User.SCHEMA$); //FLINK-3304: Ensure the OF is properly serializing the schema
 		specificUser.write(avroOutputFormat, outputPath1);
 
 		//output the data with AvroOutputFormat for reflect user type
 		DataSet<ReflectiveUser> reflectiveUser = specificUser.map(new ConvertToReflective());
-		reflectiveUser.write(new AvroOutputFormat<ReflectiveUser>(ReflectiveUser.class), outputPath2);
+		reflectiveUser.write(new AvroOutputFormat<>(ReflectiveUser.class), outputPath2);
 
 		env.execute();
 	}
@@ -92,17 +99,17 @@ public class AvroOutputFormatITCase extends JavaProgramTestBase {
 		if (file1.isDirectory()) {
 			output1 = file1.listFiles();
 			// check for avro ext in dir.
-			for (File avroOutput : output1) {
+			for (File avroOutput : Objects.requireNonNull(output1)) {
 				Assert.assertTrue("Expect extension '.avro'", avroOutput.toString().endsWith(".avro"));
 			}
 		} else {
 			output1 = new File[] {file1};
 		}
-		List<String> result1 = new ArrayList<String>();
-		DatumReader<User> userDatumReader1 = new SpecificDatumReader<User>(User.class);
+		List<String> result1 = new ArrayList<>();
+		DatumReader<User> userDatumReader1 = new SpecificDatumReader<>(User.class);
 		for (File avroOutput : output1) {
 
-			DataFileReader<User> dataFileReader1 = new DataFileReader<User>(avroOutput, userDatumReader1);
+			DataFileReader<User> dataFileReader1 = new DataFileReader<>(avroOutput, userDatumReader1);
 			while (dataFileReader1.hasNext()) {
 				User user = dataFileReader1.next();
 				result1.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor());
@@ -120,10 +127,10 @@ public class AvroOutputFormatITCase extends JavaProgramTestBase {
 		} else {
 			output2 = new File[] {file2};
 		}
-		List<String> result2 = new ArrayList<String>();
-		DatumReader<ReflectiveUser> userDatumReader2 = new ReflectDatumReader<ReflectiveUser>(ReflectiveUser.class);
-		for (File avroOutput : output2) {
-			DataFileReader<ReflectiveUser> dataFileReader2 = new DataFileReader<ReflectiveUser>(avroOutput, userDatumReader2);
+		List<String> result2 = new ArrayList<>();
+		DatumReader<ReflectiveUser> userDatumReader2 = new ReflectDatumReader<>(ReflectiveUser.class);
+		for (File avroOutput : Objects.requireNonNull(output2)) {
+			DataFileReader<ReflectiveUser> dataFileReader2 = new DataFileReader<>(avroOutput, userDatumReader2);
 			while (dataFileReader2.hasNext()) {
 				ReflectiveUser user = dataFileReader2.next();
 				result2.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor());
@@ -138,7 +145,7 @@ public class AvroOutputFormatITCase extends JavaProgramTestBase {
 	private static final class ConvertToUser extends RichMapFunction<Tuple3<String, Integer, String>, User> {
 
 		@Override
-		public User map(Tuple3<String, Integer, String> value) throws Exception {
+		public User map(Tuple3<String, Integer, String> value) {
 			User user = new User();
 			user.setName(value.f0);
 			user.setFavoriteNumber(value.f1);
@@ -148,6 +155,16 @@ public class AvroOutputFormatITCase extends JavaProgramTestBase {
 			user.setTypeArrayBoolean(Collections.emptyList());
 			user.setTypeEnum(Colors.BLUE);
 			user.setTypeMap(Collections.emptyMap());
+			user.setTypeBytes(ByteBuffer.allocate(10));
+			user.setTypeDate(LocalDate.parse("2014-03-01"));
+			user.setTypeTimeMillis(LocalTime.parse("12:12:12"));
+			user.setTypeTimeMicros(123456);
+			user.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
+			user.setTypeTimestampMicros(123456L);
+			// 20.00
+			user.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
+			// 20.00
+			user.setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
 			return user;
 		}
 	}
@@ -155,7 +172,7 @@ public class AvroOutputFormatITCase extends JavaProgramTestBase {
 	private static final class ConvertToReflective extends RichMapFunction<User, ReflectiveUser> {
 
 		@Override
-		public ReflectiveUser map(User value) throws Exception {
+		public ReflectiveUser map(User value) {
 			return new ReflectiveUser(value.getName().toString(), value.getFavoriteNumber(), value.getFavoriteColor().toString());
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
index b5ad564..3397b8e 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.formats.avro.generated.Colors;
+import org.apache.flink.formats.avro.generated.Fixed2;
 import org.apache.flink.formats.avro.generated.User;
 
 import org.apache.avro.Schema;
@@ -29,6 +30,9 @@ import org.apache.avro.file.DataFileReader;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
+import org.joda.time.DateTime;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 import org.junit.Test;
 import org.mockito.internal.util.reflection.Whitebox;
 
@@ -38,6 +42,8 @@ import java.io.File;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.util.Collections;
 
 import static org.junit.Assert.assertEquals;
@@ -50,7 +56,7 @@ import static org.junit.Assert.fail;
 public class AvroOutputFormatTest {
 
 	@Test
-	public void testSetCodec() throws Exception {
+	public void testSetCodec() {
 		// given
 		final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(User.class);
 
@@ -64,7 +70,7 @@ public class AvroOutputFormatTest {
 	}
 
 	@Test
-	public void testSetCodecError() throws Exception {
+	public void testSetCodecError() {
 		// given
 		boolean error = false;
 		final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(User.class);
@@ -111,6 +117,7 @@ public class AvroOutputFormatTest {
 			// then
 			Object o = ois.readObject();
 			assertTrue(o instanceof AvroOutputFormat);
+			@SuppressWarnings("unchecked")
 			final AvroOutputFormat<User> restored = (AvroOutputFormat<User>) o;
 			final AvroOutputFormat.Codec restoredCodec = (AvroOutputFormat.Codec) Whitebox.getInternalState(restored, "codec");
 			final Schema restoredSchema = (Schema) Whitebox.getInternalState(restored, "userDefinedSchema");
@@ -162,6 +169,17 @@ public class AvroOutputFormatTest {
 			user.setTypeArrayBoolean(Collections.emptyList());
 			user.setTypeEnum(Colors.BLUE);
 			user.setTypeMap(Collections.emptyMap());
+			user.setTypeBytes(ByteBuffer.allocate(10));
+			user.setTypeDate(LocalDate.parse("2014-03-01"));
+			user.setTypeTimeMillis(LocalTime.parse("12:12:12"));
+			user.setTypeTimeMicros(123456);
+			user.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
+			user.setTypeTimestampMicros(123456L);
+			// 20.00
+			user.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
+			// 20.00
+			user.setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
+
 			outputFormat.writeRecord(user);
 		}
 		outputFormat.close();
@@ -189,7 +207,6 @@ public class AvroOutputFormatTest {
 		//cleanup
 		FileSystem fs = FileSystem.getLocalFileSystem();
 		fs.delete(outputPath, false);
-
 	}
 
 	private void output(final AvroOutputFormat<GenericRecord> outputFormat, Schema schema) throws IOException {