You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/04/24 08:27:41 UTC

[1/2] flink git commit: [FLINK-8537] [table] Add a Kafka table source factory with Avro format support

Repository: flink
Updated Branches:
  refs/heads/master cdf4744ba -> 614b1e29e


http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
index 9fc7c88..8dbe0e1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
@@ -37,4 +37,13 @@ final class AscendingTimestamps extends PeriodicWatermarkAssigner {
   }
 
   override def getWatermark: Watermark = new Watermark(maxTimestamp - 1)
+
+  override def equals(obj: Any): Boolean = obj match {
+    case _: AscendingTimestamps => true
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    classOf[AscendingTimestamps].hashCode()
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
index dd71bd3..8695c61 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
@@ -62,7 +62,19 @@ abstract class PunctuatedWatermarkAssigner extends WatermarkStrategy {
 }
 
 /** A strategy which indicates the watermarks should be preserved from the underlying datastream.*/
-final class PreserveWatermarks extends WatermarkStrategy
+final class PreserveWatermarks extends WatermarkStrategy {
+
+  override def equals(obj: scala.Any): Boolean =  {
+    obj match {
+      case _: PreserveWatermarks => true
+      case _ => false
+    }
+  }
+
+  override def hashCode(): Int =  {
+    classOf[PreserveWatermarks].hashCode()
+  }
+}
 object PreserveWatermarks {
   val INSTANCE: PreserveWatermarks = new PreserveWatermarks
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/AvroTypesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/AvroTypesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/AvroTypesITCase.scala
deleted file mode 100644
index 1983554..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/AvroTypesITCase.scala
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.batch
-
-import java.lang.{Boolean => JBoolean, Long => JLong}
-import java.util
-import java.util.Collections
-
-import org.apache.avro.util.Utf8
-import org.apache.flink.api.scala._
-import org.apache.flink.formats.avro.generated.{Address, Colors, Fixed16, User}
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.runtime.utils.TableProgramsClusterTestBase
-import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.apache.flink.types.Row
-import org.junit.Test
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-/**
-  * Tests for interoperability with Avro types.
-  */
-@RunWith(classOf[Parameterized])
-class AvroTypesITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-  extends TableProgramsClusterTestBase(mode, configMode) {
-
-  @Test
-  def testAvroToRow(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = testData(env).toTable(tEnv)
-
-    val result = t.select('*)
-
-    val results = result.toDataSet[Row].collect()
-    val expected = "black,null,Whatever,[true],[hello],true,0.0,GREEN," +
-      "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],42,{},null,null,null,null\n" +
-      "blue,null,Charlie,[],[],false,1.337,RED," +
-      "null,1337,{},{\"num\": 42, \"street\": \"Bakerstreet\", \"city\": \"Berlin\", " +
-      "\"state\": \"Berlin\", \"zip\": \"12049\"},null,null,null\n" +
-      "yellow,null,Terminator,[false],[world],false,0.0,GREEN," +
-      "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],1,{},null,null,null,null"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testAvroStringAccess(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = testData(env).toTable(tEnv)
-
-    val result = t.select('name)
-    val results = result.toDataSet[Utf8].collect()
-    val expected = "Charlie\n" +
-      "Terminator\n" +
-      "Whatever"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testAvroObjectAccess(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = testData(env).toTable(tEnv)
-
-    val result = t
-      .filter('type_nested.isNotNull)
-      .select('type_nested.flatten()).as('city, 'num, 'state, 'street, 'zip)
-
-    val results = result.toDataSet[Address].collect()
-    val expected = AvroTypesITCase.USER_1.getTypeNested.toString
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testAvroToAvro(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = testData(env).toTable(tEnv)
-
-    val result = t.select('*).toDataSet[User].collect()
-    val expected = AvroTypesITCase.USER_1 + "\n" +
-      AvroTypesITCase.USER_2 + "\n" +
-      AvroTypesITCase.USER_3
-    TestBaseUtils.compareResultAsText(result.asJava, expected)
-  }
-
-  private def testData(env: ExecutionEnvironment): DataSet[User] = {
-
-    val data = new mutable.MutableList[User]
-
-    data.+=(AvroTypesITCase.USER_1)
-    data.+=(AvroTypesITCase.USER_2)
-    data.+=(AvroTypesITCase.USER_3)
-
-    env.fromCollection(data)
-  }
-}
-
-object AvroTypesITCase {
-
-  val USER_1: User = User.newBuilder()
-    .setName("Charlie")
-    .setFavoriteColor("blue")
-    .setFavoriteNumber(null)
-    .setTypeBoolTest(false)
-    .setTypeDoubleTest(1.337d)
-    .setTypeNullTest(null)
-    .setTypeLongTest(1337L)
-    .setTypeArrayString(new util.ArrayList[CharSequence])
-    .setTypeArrayBoolean(new util.ArrayList[JBoolean]())
-    .setTypeNullableArray(null)
-    .setTypeEnum(Colors.RED)
-    .setTypeMap(new util.HashMap[CharSequence, JLong])
-    .setTypeFixed(null)
-    .setTypeUnion(null)
-    .setTypeNested(
-      Address.newBuilder
-      .setNum(42)
-      .setStreet("Bakerstreet")
-      .setCity("Berlin")
-      .setState("Berlin")
-      .setZip("12049")
-      .build)
-    .build
-
-  val USER_2: User = User.newBuilder()
-    .setName("Whatever")
-    .setFavoriteNumber(null)
-    .setFavoriteColor("black")
-    .setTypeLongTest(42L)
-    .setTypeDoubleTest(0.0)
-    .setTypeNullTest(null)
-    .setTypeBoolTest(true)
-    .setTypeArrayString(Collections.singletonList("hello"))
-    .setTypeArrayBoolean(Collections.singletonList(true))
-    .setTypeEnum(Colors.GREEN)
-    .setTypeMap(new util.HashMap[CharSequence, JLong])
-    .setTypeFixed(new Fixed16())
-    .setTypeUnion(null)
-    .setTypeNested(null)
-    .build()
-
-  val USER_3: User = User.newBuilder()
-    .setName("Terminator")
-    .setFavoriteNumber(null)
-    .setFavoriteColor("yellow")
-    .setTypeLongTest(1L)
-    .setTypeDoubleTest(0.0)
-    .setTypeNullTest(null)
-    .setTypeBoolTest(false)
-    .setTypeArrayString(Collections.singletonList("world"))
-    .setTypeArrayBoolean(Collections.singletonList(false))
-    .setTypeEnum(Colors.GREEN)
-    .setTypeMap(new util.HashMap[CharSequence, JLong])
-    .setTypeFixed(new Fixed16())
-    .setTypeUnion(null)
-    .setTypeNested(null)
-    .build()
-}


[2/2] flink git commit: [FLINK-8537] [table] Add a Kafka table source factory with Avro format support

Posted by tw...@apache.org.
[FLINK-8537] [table] Add a Kafka table source factory with Avro format support

This closes #5610.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/614b1e29
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/614b1e29
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/614b1e29

Branch: refs/heads/master
Commit: 614b1e29eefd37fa22e7c882bc243751c6897b90
Parents: cdf4744
Author: Xingcan Cui <xi...@gmail.com>
Authored: Thu Mar 1 19:21:05 2018 +0800
Committer: Timo Walther <tw...@apache.org>
Committed: Tue Apr 24 10:23:48 2018 +0200

----------------------------------------------------------------------
 .../kafka/Kafka010AvroTableSourceFactory.java   |  37 +++
 .../kafka/Kafka010JsonTableSourceFactory.java   |   2 +-
 ...pache.flink.table.sources.TableSourceFactory |   1 +
 .../Kafka010AvroTableSourceFactoryTest.java     |  37 +++
 .../kafka/Kafka011AvroTableSourceFactory.java   |  37 +++
 .../kafka/Kafka011JsonTableSourceFactory.java   |   2 +-
 ...pache.flink.table.sources.TableSourceFactory |   1 +
 .../Kafka011AvroTableSourceFactoryTest.java     |  37 +++
 .../kafka/Kafka08AvroTableSourceFactory.java    |  37 +++
 .../kafka/Kafka08JsonTableSourceFactory.java    |   2 +-
 ...pache.flink.table.sources.TableSourceFactory |   1 +
 .../Kafka08AvroTableSourceFactoryTest.java      |  37 +++
 .../kafka/Kafka09AvroTableSourceFactory.java    |  36 +++
 .../kafka/Kafka09JsonTableSourceFactory.java    |   2 +-
 ...pache.flink.table.sources.TableSourceFactory |   1 +
 .../Kafka09AvroTableSourceFactoryTest.java      |  37 +++
 .../connectors/kafka/KafkaAvroTableSource.java  |  73 ++----
 .../kafka/KafkaAvroTableSourceFactory.java      |  81 +++++++
 .../kafka/KafkaJsonTableSourceFactory.java      | 204 +++-------------
 .../kafka/KafkaTableSourceFactory.java          | 233 +++++++++++++++++++
 .../KafkaAvroTableSourceFactoryTestBase.java    | 123 ++++++++++
 flink-formats/flink-avro/pom.xml                |  22 +-
 .../typeutils/AvroRecordClassConverter.java     |  81 +++++++
 .../apache/flink/table/descriptors/Avro.java    |  59 +++++
 .../flink/table/descriptors/AvroValidator.java  |  34 +++
 .../flink/table/descriptors/AvroTest.java       |  64 +++++
 .../table/runtime/batch/AvroTypesITCase.java    | 203 ++++++++++++++++
 flink-libraries/flink-table/pom.xml             |  15 --
 .../sources/tsextractors/ExistingField.scala    |   2 +-
 .../tsextractors/StreamRecordTimestamp.scala    |   9 +
 .../wmstrategies/AscendingTimestamps.scala      |   9 +
 .../wmstrategies/watermarkStrategies.scala      |  14 +-
 .../table/runtime/batch/AvroTypesITCase.scala   | 188 ---------------
 33 files changed, 1288 insertions(+), 433 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceFactory.java
new file mode 100644
index 0000000..3972c3f
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_010;
+
+/**
+ * Factory for creating configured instances of {@link Kafka010AvroTableSource}.
+ */
+public class Kafka010AvroTableSourceFactory extends KafkaAvroTableSourceFactory {
+
+	@Override
+	protected KafkaAvroTableSource.Builder createKafkaAvroBuilder() {
+		return new Kafka010AvroTableSource.Builder();
+	}
+
+	@Override
+	protected String kafkaVersion() {
+		return CONNECTOR_VERSION_VALUE_010;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java
index c639a44..6c128b0 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java
@@ -26,7 +26,7 @@ import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSIO
 public class Kafka010JsonTableSourceFactory extends KafkaJsonTableSourceFactory {
 
 	@Override
-	protected KafkaJsonTableSource.Builder createBuilder() {
+	protected KafkaJsonTableSource.Builder createKafkaJsonBuilder() {
 		return new Kafka010JsonTableSource.Builder();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
index 9ef54fc..cf10939 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
@@ -14,3 +14,4 @@
 # limitations under the License.
 
 org.apache.flink.streaming.connectors.kafka.Kafka010JsonTableSourceFactory
+org.apache.flink.streaming.connectors.kafka.Kafka010AvroTableSourceFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceFactoryTest.java
new file mode 100644
index 0000000..d78bfbd
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceFactoryTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_010;
+
+/**
+ * Tests for {@link Kafka010AvroTableSourceFactory}.
+ */
+public class Kafka010AvroTableSourceFactoryTest extends KafkaAvroTableSourceFactoryTestBase {
+
+	@Override
+	protected String version() {
+		return CONNECTOR_VERSION_VALUE_010;
+	}
+
+	@Override
+	protected KafkaAvroTableSource.Builder builder() {
+		return Kafka010AvroTableSource.builder();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceFactory.java
new file mode 100644
index 0000000..a959983
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_011;
+
+/**
+ * Factory for creating configured instances of {@link Kafka011AvroTableSource}.
+ */
+public class Kafka011AvroTableSourceFactory extends KafkaAvroTableSourceFactory {
+
+	@Override
+	protected KafkaAvroTableSource.Builder createKafkaAvroBuilder() {
+		return new Kafka011AvroTableSource.Builder();
+	}
+
+	@Override
+	protected String kafkaVersion() {
+		return CONNECTOR_VERSION_VALUE_011;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java
index 6745bb2..53bf7be 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java
@@ -26,7 +26,7 @@ import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSIO
 public class Kafka011JsonTableSourceFactory extends KafkaJsonTableSourceFactory {
 
 	@Override
-	protected KafkaJsonTableSource.Builder createBuilder() {
+	protected KafkaJsonTableSource.Builder createKafkaJsonBuilder() {
 		return new Kafka011JsonTableSource.Builder();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
index 75135e5..f6825ad 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
@@ -14,3 +14,4 @@
 # limitations under the License.
 
 org.apache.flink.streaming.connectors.kafka.Kafka011JsonTableSourceFactory
+org.apache.flink.streaming.connectors.kafka.Kafka011AvroTableSourceFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceFactoryTest.java
new file mode 100644
index 0000000..787a525
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceFactoryTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_011;
+
+/**
+ * Tests for {@link Kafka011AvroTableSourceFactory}.
+ */
+public class Kafka011AvroTableSourceFactoryTest extends KafkaAvroTableSourceFactoryTestBase {
+
+	@Override
+	protected String version() {
+		return CONNECTOR_VERSION_VALUE_011;
+	}
+
+	@Override
+	protected KafkaAvroTableSource.Builder builder() {
+		return Kafka011AvroTableSource.builder();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceFactory.java
new file mode 100644
index 0000000..aefc4db
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_08;
+
+/**
+ * Factory for creating configured instances of {@link Kafka08AvroTableSource}.
+ */
+public class Kafka08AvroTableSourceFactory extends KafkaAvroTableSourceFactory {
+
+	@Override
+	protected KafkaAvroTableSource.Builder createKafkaAvroBuilder() {
+		return new Kafka08AvroTableSource.Builder();
+	}
+
+	@Override
+	protected String kafkaVersion() {
+		return CONNECTOR_VERSION_VALUE_08;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactory.java
index 2da805a..13e0d57 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactory.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactory.java
@@ -26,7 +26,7 @@ import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSIO
 public class Kafka08JsonTableSourceFactory extends KafkaJsonTableSourceFactory {
 
 	@Override
-	protected KafkaJsonTableSource.Builder createBuilder() {
+	protected KafkaJsonTableSource.Builder createKafkaJsonBuilder() {
 		return new Kafka08JsonTableSource.Builder();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory b/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
index 9092955..7c45718 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
@@ -14,3 +14,4 @@
 # limitations under the License.
 
 org.apache.flink.streaming.connectors.kafka.Kafka08JsonTableSourceFactory
+org.apache.flink.streaming.connectors.kafka.Kafka08AvroTableSourceFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceFactoryTest.java
new file mode 100644
index 0000000..f367636
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceFactoryTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_08;
+
+/**
+ * Tests for {@link Kafka08AvroTableSourceFactory}.
+ */
+public class Kafka08AvroTableSourceFactoryTest extends KafkaAvroTableSourceFactoryTestBase {
+
+	@Override
+	protected String version() {
+		return CONNECTOR_VERSION_VALUE_08;
+	}
+
+	@Override
+	protected KafkaAvroTableSource.Builder builder() {
+		return Kafka08AvroTableSource.builder();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceFactory.java
new file mode 100644
index 0000000..0cbe2bf
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_09;
+
+/**
+ * Factory for creating configured instances of {@link Kafka09AvroTableSource}.
+ */
+public class Kafka09AvroTableSourceFactory extends KafkaAvroTableSourceFactory {
+	@Override
+	protected KafkaAvroTableSource.Builder createKafkaAvroBuilder() {
+		return new Kafka09AvroTableSource.Builder();
+	}
+
+	@Override
+	protected String kafkaVersion() {
+		return CONNECTOR_VERSION_VALUE_09;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactory.java
index 9207426..7363282 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactory.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactory.java
@@ -26,7 +26,7 @@ import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSIO
 public class Kafka09JsonTableSourceFactory extends KafkaJsonTableSourceFactory {
 
 	@Override
-	protected KafkaJsonTableSource.Builder createBuilder() {
+	protected KafkaJsonTableSource.Builder createKafkaJsonBuilder() {
 		return new Kafka09JsonTableSource.Builder();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
index 2f38bd0..4717da9 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
@@ -14,3 +14,4 @@
 # limitations under the License.
 
 org.apache.flink.streaming.connectors.kafka.Kafka09JsonTableSourceFactory
+org.apache.flink.streaming.connectors.kafka.Kafka09AvroTableSourceFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceFactoryTest.java
new file mode 100644
index 0000000..3078373
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceFactoryTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_09;
+
+/**
+ * Tests for {@link Kafka09AvroTableSourceFactory}.
+ */
+public class Kafka09AvroTableSourceFactoryTest extends KafkaAvroTableSourceFactoryTestBase {
+
+	@Override
+	protected String version() {
+		return CONNECTOR_VERSION_VALUE_09;
+	}
+
+	@Override
+	protected KafkaAvroTableSource.Builder builder() {
+		return Kafka09AvroTableSource.builder();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
index 1587798..7828a1c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
@@ -20,25 +20,17 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.formats.avro.AvroRowDeserializationSchema;
-import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.sources.DefinedFieldMapping;
 import org.apache.flink.table.sources.StreamTableSource;
-import org.apache.flink.types.Row;
 
-import org.apache.avro.Schema;
-import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificRecord;
 import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.avro.util.Utf8;
 
-import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 
 /**
@@ -72,7 +64,7 @@ public abstract class KafkaAvroTableSource extends KafkaTableSource implements D
 			topic,
 			properties,
 			schema,
-			convertToRowTypeInformation(avroRecordClass));
+			AvroRecordClassConverter.convert(avroRecordClass));
 
 		this.avroRecordClass = avroRecordClass;
 	}
@@ -92,6 +84,27 @@ public abstract class KafkaAvroTableSource extends KafkaTableSource implements D
 		return new AvroRowDeserializationSchema(avroRecordClass);
 	}
 
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (!(o instanceof KafkaAvroTableSource)) {
+			return false;
+		}
+		if (!super.equals(o)) {
+			return false;
+		}
+		final KafkaAvroTableSource that = (KafkaAvroTableSource) o;
+		return Objects.equals(avroRecordClass, that.avroRecordClass) &&
+				Objects.equals(fieldMapping, that.fieldMapping);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(super.hashCode(), avroRecordClass, fieldMapping);
+	}
+
 	//////// SETTERS FOR OPTIONAL PARAMETERS
 
 	/**
@@ -106,44 +119,6 @@ public abstract class KafkaAvroTableSource extends KafkaTableSource implements D
 	//////// HELPER METHODS
 
 	/**
-	 * Converts the extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order.
-	 * Replaces generic Utf8 with basic String type information.
-	 */
-	@SuppressWarnings("unchecked")
-	private static <T extends SpecificRecordBase> TypeInformation<Row> convertToRowTypeInformation(Class<T> avroClass) {
-		final AvroTypeInfo<T> avroTypeInfo = new AvroTypeInfo<>(avroClass);
-		// determine schema to retrieve deterministic field order
-		final Schema schema = SpecificData.get().getSchema(avroClass);
-		return (TypeInformation<Row>) convertToTypeInformation(avroTypeInfo, schema);
-	}
-
-	/**
-	 * Recursively converts extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order.
-	 * Replaces generic Utf8 with basic String type information.
-	 */
-	private static TypeInformation<?> convertToTypeInformation(TypeInformation<?> extracted, Schema schema) {
-		if (schema.getType() == Schema.Type.RECORD) {
-			final List<Schema.Field> fields = schema.getFields();
-			final AvroTypeInfo<?> avroTypeInfo = (AvroTypeInfo<?>) extracted;
-
-			final TypeInformation<?>[] types = new TypeInformation<?>[fields.size()];
-			final String[] names = new String[fields.size()];
-			for (int i = 0; i < fields.size(); i++) {
-				final Schema.Field field = fields.get(i);
-				types[i] = convertToTypeInformation(avroTypeInfo.getTypeAt(field.name()), field.schema());
-				names[i] = field.name();
-			}
-			return new RowTypeInfo(types, names);
-		} else if (extracted instanceof GenericTypeInfo<?>) {
-			final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted;
-			if (genericTypeInfo.getTypeClass() == Utf8.class) {
-				return BasicTypeInfo.STRING_TYPE_INFO;
-			}
-		}
-		return extracted;
-	}
-
-	/**
 	 * Abstract builder for a {@link KafkaAvroTableSource} to be extended by builders of subclasses of
 	 * KafkaAvroTableSource.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceFactory.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceFactory.java
new file mode 100644
index 0000000..1401914
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceFactory.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.AvroValidator;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FormatDescriptorValidator;
+import org.apache.flink.table.descriptors.SchemaValidator;
+
+import org.apache.avro.specific.SpecificRecordBase;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Factory for creating configured instances of {@link KafkaAvroTableSource}.
+ */
+public abstract class KafkaAvroTableSourceFactory extends KafkaTableSourceFactory {
+
+	@Override
+	protected String formatType() {
+		return AvroValidator.FORMAT_TYPE_VALUE;
+	}
+
+	@Override
+	protected int formatPropertyVersion() {
+		return 1;
+	}
+
+	@Override
+	protected List<String> formatProperties() {
+		return Collections.singletonList(AvroValidator.FORMAT_RECORD_CLASS);
+	}
+
+	@Override
+	protected FormatDescriptorValidator formatValidator() {
+		return new AvroValidator();
+	}
+
+	@Override
+	protected KafkaTableSource.Builder createBuilderWithFormat(DescriptorProperties params) {
+		KafkaAvroTableSource.Builder builder = createKafkaAvroBuilder();
+
+		// Avro format schema
+		final Class<? extends SpecificRecordBase> avroRecordClass =
+				params.getClass(AvroValidator.FORMAT_RECORD_CLASS, SpecificRecordBase.class);
+		builder.forAvroRecordClass(avroRecordClass);
+		final TableSchema formatSchema = TableSchema.fromTypeInfo(AvroRecordClassConverter.convert(avroRecordClass));
+
+		// field mapping
+		final Map<String, String> mapping = SchemaValidator.deriveFieldMapping(params, Optional.of(formatSchema));
+		builder.withTableToAvroMapping(mapping);
+
+		return builder;
+	}
+
+	/**
+	 * Creates a version specific {@link KafkaAvroTableSource.Builder}.
+	 */
+	protected abstract KafkaAvroTableSource.Builder createKafkaAvroBuilder();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java
index 2897314..9898569 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java
@@ -20,219 +20,77 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.formats.json.JsonSchemaConverter;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FormatDescriptorValidator;
 import org.apache.flink.table.descriptors.JsonValidator;
-import org.apache.flink.table.descriptors.KafkaValidator;
 import org.apache.flink.table.descriptors.SchemaValidator;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.table.sources.TableSource;
-import org.apache.flink.table.sources.TableSourceFactory;
-import org.apache.flink.types.Row;
 
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Properties;
-
-import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
-import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
-import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
-import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA;
-import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION;
-import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_TYPE;
-import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD;
-import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_JSON_SCHEMA;
-import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_SCHEMA;
-import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_TYPE_VALUE;
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES;
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_KEY;
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_VALUE;
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS;
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_OFFSET;
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_PARTITION;
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_STARTUP_MODE;
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TOPIC;
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TYPE_VALUE_KAFKA;
-import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_CLASS;
-import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_FROM;
-import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_SERIALIZED;
-import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_TYPE;
-import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_CLASS;
-import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_DELAY;
-import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_SERIALIZED;
-import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_TYPE;
-import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA;
-import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_FROM;
-import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_NAME;
-import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_PROCTIME;
-import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_TYPE;
 
 /**
  * Factory for creating configured instances of {@link KafkaJsonTableSource}.
  */
-public abstract class KafkaJsonTableSourceFactory implements TableSourceFactory<Row> {
+public abstract class KafkaJsonTableSourceFactory extends KafkaTableSourceFactory {
 
 	@Override
-	public Map<String, String> requiredContext() {
-		Map<String, String> context = new HashMap<>();
-		context.put(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE_KAFKA); // kafka
-		context.put(CONNECTOR_VERSION(), kafkaVersion());
-
-		context.put(FORMAT_TYPE(), FORMAT_TYPE_VALUE); // json format
-
-		context.put(CONNECTOR_PROPERTY_VERSION(), "1"); // backwards compatibility
-		context.put(FORMAT_PROPERTY_VERSION(), "1");
-
-		return context;
+	protected String formatType() {
+		return JsonValidator.FORMAT_TYPE_VALUE;
 	}
 
 	@Override
-	public List<String> supportedProperties() {
-		List<String> properties = new ArrayList<>();
-
-		// kafka
-		properties.add(CONNECTOR_TOPIC);
-		properties.add(CONNECTOR_PROPERTIES);
-		properties.add(CONNECTOR_PROPERTIES + ".#." + CONNECTOR_PROPERTIES_KEY);
-		properties.add(CONNECTOR_PROPERTIES + ".#." + CONNECTOR_PROPERTIES_VALUE);
-		properties.add(CONNECTOR_STARTUP_MODE);
-		properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + CONNECTOR_SPECIFIC_OFFSETS_PARTITION);
-		properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + CONNECTOR_SPECIFIC_OFFSETS_OFFSET);
-
-		// json format
-		properties.add(FORMAT_JSON_SCHEMA);
-		properties.add(FORMAT_SCHEMA);
-		properties.add(FORMAT_FAIL_ON_MISSING_FIELD);
-		properties.add(FORMAT_DERIVE_SCHEMA());
-
-		// schema
-		properties.add(SCHEMA() + ".#." + SCHEMA_TYPE());
-		properties.add(SCHEMA() + ".#." + SCHEMA_NAME());
-		properties.add(SCHEMA() + ".#." + SCHEMA_FROM());
-
-		// time attributes
-		properties.add(SCHEMA() + ".#." + SCHEMA_PROCTIME());
-		properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_TYPE());
-		properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_FROM());
-		properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_CLASS());
-		properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED());
-		properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_TYPE());
-		properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_CLASS());
-		properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_SERIALIZED());
-		properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_DELAY());
-
-		return properties;
+	protected int formatPropertyVersion() {
+		return 1;
 	}
 
 	@Override
-	public TableSource<Row> create(Map<String, String> properties) {
-		final DescriptorProperties params = new DescriptorProperties(true);
-		params.putProperties(properties);
-
-		// validate
-		new SchemaValidator(true).validate(params);
-		new KafkaValidator().validate(params);
-		new JsonValidator().validate(params);
-
-		// build
-		final KafkaJsonTableSource.Builder builder = createBuilder();
-
-		// topic
-		final String topic = params.getString(CONNECTOR_TOPIC);
-		builder.forTopic(topic);
-
-		// properties
-		final Properties props = new Properties();
-		final List<Map<String, String>> propsList = params.getFixedIndexedProperties(
-			CONNECTOR_PROPERTIES,
-			Arrays.asList(CONNECTOR_PROPERTIES_KEY, CONNECTOR_PROPERTIES_VALUE));
-		propsList.forEach(kv -> props.put(
-			params.getString(kv.get(CONNECTOR_PROPERTIES_KEY)),
-			params.getString(kv.get(CONNECTOR_PROPERTIES_VALUE))
-		));
-		builder.withKafkaProperties(props);
-
-		// startup mode
-		params
-			.getOptionalString(CONNECTOR_STARTUP_MODE)
-			.ifPresent(startupMode -> {
-				switch (startupMode) {
-
-				case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_EARLIEST:
-					builder.fromEarliest();
-					break;
-
-				case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_LATEST:
-					builder.fromLatest();
-					break;
-
-				case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_GROUP_OFFSETS:
-					builder.fromGroupOffsets();
-					break;
+	protected List<String> formatProperties() {
+		return Arrays.asList(
+			JsonValidator.FORMAT_JSON_SCHEMA,
+			JsonValidator.FORMAT_SCHEMA,
+			JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD,
+			FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA());
+	}
 
-				case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS:
-					final Map<KafkaTopicPartition, Long> offsetMap = new HashMap<>();
+	@Override
+	protected FormatDescriptorValidator formatValidator() {
+		return new JsonValidator();
+	}
 
-					final List<Map<String, String>> offsetList = params.getFixedIndexedProperties(
-						CONNECTOR_SPECIFIC_OFFSETS,
-						Arrays.asList(CONNECTOR_SPECIFIC_OFFSETS_PARTITION, CONNECTOR_SPECIFIC_OFFSETS_OFFSET));
-					offsetList.forEach(kv -> {
-						final int partition = params.getInt(kv.get(CONNECTOR_SPECIFIC_OFFSETS_PARTITION));
-						final long offset = params.getLong(kv.get(CONNECTOR_SPECIFIC_OFFSETS_OFFSET));
-						final KafkaTopicPartition topicPartition = new KafkaTopicPartition(topic, partition);
-						offsetMap.put(topicPartition, offset);
-					});
-					builder.fromSpecificOffsets(offsetMap);
-					break;
-				}
-			});
+	@Override
+	protected KafkaTableSource.Builder createBuilderWithFormat(DescriptorProperties params) {
+		final KafkaJsonTableSource.Builder builder = createKafkaJsonBuilder();
 
 		// missing field
-		params.getOptionalBoolean(FORMAT_FAIL_ON_MISSING_FIELD).ifPresent(builder::failOnMissingField);
+		params.getOptionalBoolean(JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD)
+				.ifPresent(builder::failOnMissingField);
 
 		// json schema
 		final TableSchema formatSchema;
-		if (params.containsKey(FORMAT_SCHEMA)) {
-			final TypeInformation<?> info = params.getType(FORMAT_SCHEMA);
+		if (params.containsKey(JsonValidator.FORMAT_SCHEMA)) {
+			final TypeInformation<?> info = params.getType(JsonValidator.FORMAT_SCHEMA);
 			formatSchema = TableSchema.fromTypeInfo(info);
-		} else if (params.containsKey(FORMAT_JSON_SCHEMA)) {
-			final TypeInformation<?> info = JsonSchemaConverter.convert(params.getString(FORMAT_JSON_SCHEMA));
+		} else if (params.containsKey(JsonValidator.FORMAT_JSON_SCHEMA)) {
+			final TypeInformation<?> info = JsonSchemaConverter.convert(params.getString(JsonValidator.FORMAT_JSON_SCHEMA));
 			formatSchema = TableSchema.fromTypeInfo(info);
 		} else {
 			formatSchema = SchemaValidator.deriveFormatFields(params);
 		}
 		builder.forJsonSchema(formatSchema);
 
-		// schema
-		final TableSchema schema = params.getTableSchema(SCHEMA());
-		builder.withSchema(schema);
-
-		// proctime
-		SchemaValidator.deriveProctimeAttribute(params).ifPresent(builder::withProctimeAttribute);
-
-		// rowtime
-		final List<RowtimeAttributeDescriptor> descriptors = SchemaValidator.deriveRowtimeAttributes(params);
-		if (descriptors.size() > 1) {
-			throw new TableException("More than one rowtime attribute is not supported yet.");
-		} else if (descriptors.size() == 1) {
-			final RowtimeAttributeDescriptor desc = descriptors.get(0);
-			builder.withRowtimeAttribute(desc.getAttributeName(), desc.getTimestampExtractor(), desc.getWatermarkStrategy());
-		}
-
 		// field mapping
 		final Map<String, String> mapping = SchemaValidator.deriveFieldMapping(params, Optional.of(formatSchema));
 		builder.withTableToJsonMapping(mapping);
 
-		return builder.build();
+		return builder;
 	}
 
-	protected abstract KafkaJsonTableSource.Builder createBuilder();
+	/**
+	 * Creates a version specific {@link KafkaJsonTableSource.Builder}.
+	 */
+	protected abstract KafkaJsonTableSource.Builder createKafkaJsonBuilder();
 
-	protected abstract String kafkaVersion();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java
new file mode 100644
index 0000000..6aaef9b
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FormatDescriptorValidator;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.sources.TableSourceFactory;
+import org.apache.flink.types.Row;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
+import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION;
+import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_TYPE;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_KEY;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_VALUE;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_OFFSET;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_PARTITION;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_STARTUP_MODE;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TOPIC;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TYPE_VALUE_KAFKA;
+import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_CLASS;
+import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_FROM;
+import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_SERIALIZED;
+import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_TYPE;
+import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_CLASS;
+import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_DELAY;
+import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_SERIALIZED;
+import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_TYPE;
+import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA;
+import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_FROM;
+import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_NAME;
+import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_PROCTIME;
+import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_TYPE;
+
+/**
+ * Factory for creating configured instances of {@link KafkaJsonTableSource}.
+ */
+abstract class KafkaTableSourceFactory implements TableSourceFactory<Row> {
+
+	@Override
+	public Map<String, String> requiredContext() {
+		Map<String, String> context = new HashMap<>();
+		context.put(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE_KAFKA); // kafka
+		context.put(CONNECTOR_VERSION(), kafkaVersion()); // version
+
+		context.put(FORMAT_TYPE(), formatType()); // format
+
+		context.put(CONNECTOR_PROPERTY_VERSION(), "1"); // backwards compatibility
+		context.put(FORMAT_PROPERTY_VERSION(), String.valueOf(formatPropertyVersion()));
+
+		return context;
+	}
+
+	@Override
+	public List<String> supportedProperties() {
+		List<String> properties = new ArrayList<>();
+
+		// kafka
+		properties.add(CONNECTOR_TOPIC);
+		properties.add(CONNECTOR_PROPERTIES);
+		properties.add(CONNECTOR_PROPERTIES + ".#." + CONNECTOR_PROPERTIES_KEY);
+		properties.add(CONNECTOR_PROPERTIES + ".#." + CONNECTOR_PROPERTIES_VALUE);
+		properties.add(CONNECTOR_STARTUP_MODE);
+		properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + CONNECTOR_SPECIFIC_OFFSETS_PARTITION);
+		properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + CONNECTOR_SPECIFIC_OFFSETS_OFFSET);
+
+		// schema
+		properties.add(SCHEMA() + ".#." + SCHEMA_TYPE());
+		properties.add(SCHEMA() + ".#." + SCHEMA_NAME());
+		properties.add(SCHEMA() + ".#." + SCHEMA_FROM());
+
+		// time attributes
+		properties.add(SCHEMA() + ".#." + SCHEMA_PROCTIME());
+		properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_TYPE());
+		properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_FROM());
+		properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_CLASS());
+		properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED());
+		properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_TYPE());
+		properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_CLASS());
+		properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_SERIALIZED());
+		properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_DELAY());
+
+		properties.addAll(formatProperties());
+
+		return properties;
+	}
+
+	@Override
+	public TableSource<Row> create(Map<String, String> properties) {
+		final DescriptorProperties params = new DescriptorProperties(true);
+		params.putProperties(properties);
+
+		// validate
+		new SchemaValidator(true).validate(params);
+		new KafkaValidator().validate(params);
+		formatValidator().validate(params);
+
+		// build
+		final KafkaTableSource.Builder builder = createBuilderWithFormat(params);
+
+		// topic
+		final String topic = params.getString(CONNECTOR_TOPIC);
+		builder.forTopic(topic);
+
+		// properties
+		final Properties props = new Properties();
+		final List<Map<String, String>> propsList = params.getFixedIndexedProperties(
+			CONNECTOR_PROPERTIES,
+			Arrays.asList(CONNECTOR_PROPERTIES_KEY, CONNECTOR_PROPERTIES_VALUE));
+		propsList.forEach(kv -> props.put(
+			params.getString(kv.get(CONNECTOR_PROPERTIES_KEY)),
+			params.getString(kv.get(CONNECTOR_PROPERTIES_VALUE))
+		));
+		builder.withKafkaProperties(props);
+
+		// startup mode
+		params
+			.getOptionalString(CONNECTOR_STARTUP_MODE)
+			.ifPresent(startupMode -> {
+				switch (startupMode) {
+
+				case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_EARLIEST:
+					builder.fromEarliest();
+					break;
+
+				case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_LATEST:
+					builder.fromLatest();
+					break;
+
+				case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_GROUP_OFFSETS:
+					builder.fromGroupOffsets();
+					break;
+
+				case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS:
+					final Map<KafkaTopicPartition, Long> offsetMap = new HashMap<>();
+
+					final List<Map<String, String>> offsetList = params.getFixedIndexedProperties(
+						CONNECTOR_SPECIFIC_OFFSETS,
+						Arrays.asList(CONNECTOR_SPECIFIC_OFFSETS_PARTITION, CONNECTOR_SPECIFIC_OFFSETS_OFFSET));
+					offsetList.forEach(kv -> {
+						final int partition = params.getInt(kv.get(CONNECTOR_SPECIFIC_OFFSETS_PARTITION));
+						final long offset = params.getLong(kv.get(CONNECTOR_SPECIFIC_OFFSETS_OFFSET));
+						final KafkaTopicPartition topicPartition = new KafkaTopicPartition(topic, partition);
+						offsetMap.put(topicPartition, offset);
+					});
+					builder.fromSpecificOffsets(offsetMap);
+					break;
+				}
+			});
+
+		// schema
+		final TableSchema schema = params.getTableSchema(SCHEMA());
+		builder.withSchema(schema);
+
+		// proctime
+		SchemaValidator.deriveProctimeAttribute(params).ifPresent(builder::withProctimeAttribute);
+
+		// rowtime
+		final List<RowtimeAttributeDescriptor> descriptors = SchemaValidator.deriveRowtimeAttributes(params);
+		if (descriptors.size() > 1) {
+			throw new TableException("More than one rowtime attribute is not supported yet.");
+		} else if (descriptors.size() == 1) {
+			final RowtimeAttributeDescriptor desc = descriptors.get(0);
+			builder.withRowtimeAttribute(desc.getAttributeName(), desc.getTimestampExtractor(), desc.getWatermarkStrategy());
+		}
+
+		return builder.build();
+	}
+
+	/**
+	 * Returns the format type string (e.g., "json").
+	 */
+	protected abstract String formatType();
+
+	/**
+	 * Returns the format property version.
+	 */
+	protected abstract int formatPropertyVersion();
+
+	/**
+	 * Returns the properties of the format.
+	 */
+	protected abstract List<String> formatProperties();
+
+	/**
+	 * Returns the validator for the format.
+	 */
+	protected abstract FormatDescriptorValidator formatValidator();
+
+	/**
+	 * Returns the Kafka version.
+	 */
+	protected abstract String kafkaVersion();
+
+	/**
+	 * Creates a builder with all the format-related configurations have been set.
+	 */
+	protected abstract KafkaTableSource.Builder createBuilderWithFormat(DescriptorProperties params);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceFactoryTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceFactoryTestBase.java
new file mode 100644
index 0000000..9552559
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceFactoryTestBase.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.formats.avro.generated.Address;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.Avro;
+import org.apache.flink.table.descriptors.FormatDescriptor;
+import org.apache.flink.table.descriptors.Kafka;
+import org.apache.flink.table.descriptors.Schema;
+import org.apache.flink.table.descriptors.TestTableSourceDescriptor;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.sources.TableSourceFactoryService;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Tests for {@link KafkaAvroTableSourceFactory}.
+ */
+public abstract class KafkaAvroTableSourceFactoryTestBase {
+
+	private static final String TOPIC = "test-topic";
+
+	protected abstract String version();
+
+	protected abstract KafkaAvroTableSource.Builder builder();
+
+	@Test
+	public void testTableSourceFromAvroSchema() {
+		testTableSource(new Avro().recordClass(Address.class));
+	}
+
+	private void testTableSource(FormatDescriptor format) {
+		// construct table source using a builder
+
+		final Map<String, String> tableAvroMapping = new HashMap<>();
+		tableAvroMapping.put("a_street", "street");
+		tableAvroMapping.put("street", "street");
+		tableAvroMapping.put("b_city", "city");
+		tableAvroMapping.put("city", "city");
+		tableAvroMapping.put("c_state", "state");
+		tableAvroMapping.put("state", "state");
+		tableAvroMapping.put("zip", "zip");
+		tableAvroMapping.put("num", "num");
+
+		final Properties props = new Properties();
+		props.put("group.id", "test-group");
+		props.put("bootstrap.servers", "localhost:1234");
+
+		final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
+		specificOffsets.put(new KafkaTopicPartition(TOPIC, 0), 100L);
+		specificOffsets.put(new KafkaTopicPartition(TOPIC, 1), 123L);
+
+		KafkaAvroTableSource.Builder builder = builder();
+
+		builder.forAvroRecordClass(Address.class)
+				.withTableToAvroMapping(tableAvroMapping);
+
+		final KafkaTableSource builderSource = builder
+				.withKafkaProperties(props)
+				.forTopic(TOPIC)
+				.fromSpecificOffsets(specificOffsets)
+				.withSchema(
+						TableSchema.builder()
+								.field("a_street", Types.STRING)
+								.field("b_city", Types.STRING)
+								.field("c_state", Types.STRING)
+								.field("zip", Types.STRING)
+								.field("proctime", Types.SQL_TIMESTAMP)
+								.build())
+				.withProctimeAttribute("proctime")
+				.build();
+
+		// construct table source using descriptors and table source factory
+
+		final Map<Integer, Long> offsets = new HashMap<>();
+		offsets.put(0, 100L);
+		offsets.put(1, 123L);
+
+		final TestTableSourceDescriptor testDesc = new TestTableSourceDescriptor(
+				new Kafka()
+						.version(version())
+						.topic(TOPIC)
+						.properties(props)
+						.startFromSpecificOffsets(offsets))
+				.addFormat(format)
+				.addSchema(
+						new Schema()
+								.field("a_street", Types.STRING).from("street")
+								.field("b_city", Types.STRING).from("city")
+								.field("c_state", Types.STRING).from("state")
+								.field("zip", Types.STRING)
+								.field("proctime", Types.SQL_TIMESTAMP).proctime());
+
+		final TableSource<?> factorySource = TableSourceFactoryService.findAndCreateTableSource(testDesc);
+
+		assertEquals(builderSource, factorySource);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-formats/flink-avro/pom.xml
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/pom.xml b/flink-formats/flink-avro/pom.xml
index 3458760..2a437f6 100644
--- a/flink-formats/flink-avro/pom.xml
+++ b/flink-formats/flink-avro/pom.xml
@@ -51,17 +51,35 @@ under the License.
 			<!-- managed version -->
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<!-- use a dedicated Scala version to not depend on it -->
+			<artifactId>flink-table_2.11</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+			<!-- Projects depending on this project, won't depend on flink-table. -->
+			<optional>true</optional>
+		</dependency>
+
 		<!-- test dependencies -->
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils-junit</artifactId>
+			<artifactId>flink-table_2.11</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
+			<type>test-jar</type>
 		</dependency>
 
 		<!-- To avoid having to have the 'flink-avro' project dependent on a particular
-			Scala version, we hard-refer the flink-test-utils_2.11 here -->
+			Scala version, we hard-refer the flink-streaming-scala_2.11 here -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-scala_2.11</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java
new file mode 100644
index 0000000..b7b4871
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.util.Utf8;
+
+import java.util.List;
+
+/**
+ * Utilities for Avro record class conversion.
+ */
+public class AvroRecordClassConverter {
+
+	private AvroRecordClassConverter() {
+		// private
+	}
+
+	/**
+	 * Converts the extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order.
+	 * Replaces generic Utf8 with basic String type information.
+	 */
+	@SuppressWarnings("unchecked")
+	public static <T extends SpecificRecordBase> TypeInformation<Row> convert(Class<T> avroClass) {
+		final AvroTypeInfo<T> avroTypeInfo = new AvroTypeInfo<>(avroClass);
+		// determine schema to retrieve deterministic field order
+		final Schema schema = SpecificData.get().getSchema(avroClass);
+		return (TypeInformation<Row>) convertType(avroTypeInfo, schema);
+	}
+
+	/**
+	 * Recursively converts extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order.
+	 * Replaces generic Utf8 with basic String type information.
+	 */
+	private static TypeInformation<?> convertType(TypeInformation<?> extracted, Schema schema) {
+		if (schema.getType() == Schema.Type.RECORD) {
+			final List<Schema.Field> fields = schema.getFields();
+			final AvroTypeInfo<?> avroTypeInfo = (AvroTypeInfo<?>) extracted;
+
+			final TypeInformation<?>[] types = new TypeInformation<?>[fields.size()];
+			final String[] names = new String[fields.size()];
+			for (int i = 0; i < fields.size(); i++) {
+				final Schema.Field field = fields.get(i);
+				types[i] = convertType(avroTypeInfo.getTypeAt(field.name()), field.schema());
+				names[i] = field.name();
+			}
+			return new RowTypeInfo(types, names);
+		} else if (extracted instanceof GenericTypeInfo<?>) {
+			final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted;
+			if (genericTypeInfo.getTypeClass() == Utf8.class) {
+				return BasicTypeInfo.STRING_TYPE_INFO;
+			}
+		}
+		return extracted;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-formats/flink-avro/src/main/java/org/apache/flink/table/descriptors/Avro.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/table/descriptors/Avro.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/table/descriptors/Avro.java
new file mode 100644
index 0000000..f07a22f
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/table/descriptors/Avro.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.specific.SpecificRecordBase;
+
+/**
+ * Format descriptor for Apache Avro records.
+ */
+public class Avro extends FormatDescriptor {
+
+	private Class<? extends SpecificRecordBase> recordClass;
+
+	/**
+	 * Format descriptor for Apache Avro records.
+	 */
+	public Avro() {
+		super(AvroValidator.FORMAT_TYPE_VALUE, 1);
+	}
+
+	/**
+	 * Sets the class of the Avro specific record. Required.
+	 *
+	 * @param recordClass class of the Avro record.
+	 */
+	public Avro recordClass(Class<? extends SpecificRecordBase> recordClass) {
+		Preconditions.checkNotNull(recordClass);
+		this.recordClass = recordClass;
+		return this;
+	}
+
+	/**
+	 * Internal method for format properties conversion.
+	 */
+	@Override
+	public void addFormatProperties(DescriptorProperties properties) {
+		if (null != recordClass) {
+			properties.putClass(AvroValidator.FORMAT_RECORD_CLASS, recordClass);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-formats/flink-avro/src/main/java/org/apache/flink/table/descriptors/AvroValidator.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/table/descriptors/AvroValidator.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/table/descriptors/AvroValidator.java
new file mode 100644
index 0000000..8a72abf
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/table/descriptors/AvroValidator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+/**
+ * Validator for {@link Avro}.
+ */
+public class AvroValidator extends FormatDescriptorValidator {
+
+	public static final String FORMAT_TYPE_VALUE = "avro";
+	public static final String FORMAT_RECORD_CLASS = "format.record-class";
+
+	@Override
+	public void validate(DescriptorProperties properties) {
+		super.validate(properties);
+		properties.validateString(FORMAT_RECORD_CLASS, false, 1);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-formats/flink-avro/src/test/java/org/apache/flink/table/descriptors/AvroTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/descriptors/AvroTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/descriptors/AvroTest.java
new file mode 100644
index 0000000..2345553
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/descriptors/AvroTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.table.api.ValidationException;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests for the {@link Avro} descriptor.
+ */
+public class AvroTest extends DescriptorTestBase {
+
+	@Test(expected = ValidationException.class)
+	public void testMissingRecordClass() {
+		removePropertyAndVerify(descriptors().get(0), "format.record-class");
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public List<Descriptor> descriptors() {
+		final Descriptor desc1 = new Avro().recordClass(User.class);
+		return Collections.singletonList(desc1);
+	}
+
+	@Override
+	public List<Map<String, String>> properties() {
+		final Map<String, String> props1 = new HashMap<>();
+		props1.put("format.type", "avro");
+		props1.put("format.property-version", "1");
+		props1.put("format.record-class", "org.apache.flink.formats.avro.generated.User");
+
+		return Collections.singletonList(props1);
+	}
+
+	@Override
+	public DescriptorValidator validator() {
+		return new AvroValidator();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
new file mode 100644
index 0000000..88c70c6
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.formats.avro.generated.Address;
+import org.apache.flink.formats.avro.generated.Colors;
+import org.apache.flink.formats.avro.generated.Fixed16;
+import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.runtime.utils.TableProgramsClusterTestBase;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.Row;
+
+import org.apache.avro.util.Utf8;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests for interoperability with Avro types.
+ */
+@RunWith(Parameterized.class)
+public class AvroTypesITCase extends TableProgramsClusterTestBase {
+
+	private static final User USER_1 = User.newBuilder()
+			.setName("Charlie")
+			.setFavoriteColor("blue")
+			.setFavoriteNumber(null)
+			.setTypeBoolTest(false)
+			.setTypeDoubleTest(1.337d)
+			.setTypeNullTest(null)
+			.setTypeLongTest(1337L)
+			.setTypeArrayString(new ArrayList<>())
+			.setTypeArrayBoolean(new ArrayList<>())
+			.setTypeNullableArray(null)
+			.setTypeEnum(Colors.RED)
+			.setTypeMap(new HashMap<>())
+			.setTypeFixed(null)
+			.setTypeUnion(null)
+			.setTypeNested(
+					Address.newBuilder()
+							.setNum(42)
+							.setStreet("Bakerstreet")
+							.setCity("Berlin")
+							.setState("Berlin")
+							.setZip("12049").build())
+			.build();
+
+	private static final User USER_2 = User.newBuilder()
+			.setName("Whatever")
+			.setFavoriteNumber(null)
+			.setFavoriteColor("black")
+			.setTypeLongTest(42L)
+			.setTypeDoubleTest(0.0)
+			.setTypeNullTest(null)
+			.setTypeBoolTest(true)
+			.setTypeArrayString(Collections.singletonList("hello"))
+			.setTypeArrayBoolean(Collections.singletonList(true))
+			.setTypeEnum(Colors.GREEN)
+			.setTypeMap(new HashMap<>())
+			.setTypeFixed(new Fixed16())
+			.setTypeUnion(null)
+			.setTypeNested(null)
+			.build();
+
+	private static final User USER_3 = User.newBuilder()
+			.setName("Terminator")
+			.setFavoriteNumber(null)
+			.setFavoriteColor("yellow")
+			.setTypeLongTest(1L)
+			.setTypeDoubleTest(0.0)
+			.setTypeNullTest(null)
+			.setTypeBoolTest(false)
+			.setTypeArrayString(Collections.singletonList("world"))
+			.setTypeArrayBoolean(Collections.singletonList(false))
+			.setTypeEnum(Colors.GREEN)
+			.setTypeMap(new HashMap<>())
+			.setTypeFixed(new Fixed16())
+			.setTypeUnion(null)
+			.setTypeNested(null)
+			.build();
+
+	private static TypeInformation<Row> rowType = Types.ROW(
+			Types.GENERIC(Utf8.class),
+			Types.INT,
+			Types.GENERIC(Utf8.class),
+			Types.GENERIC(List.class),
+			Types.GENERIC(List.class),
+			Types.GENERIC(Object.class),
+			Types.DOUBLE,
+			Types.ENUM(Colors.class),
+			Types.GENERIC(Fixed16.class),
+			Types.LONG,
+			Types.GENERIC(Map.class),
+			Types.POJO(Address.class),
+			Types.GENERIC(Object.class),
+			Types.GENERIC(List.class),
+			Types.GENERIC(Object.class)
+	);
+
+	public AvroTypesITCase(
+			TestExecutionMode executionMode,
+			TableConfigMode tableConfigMode) {
+		super(executionMode, tableConfigMode);
+	}
+
+	@Test
+	public void testAvroToRow() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		Table t = tEnv.fromDataSet(testData(env));
+		Table result = t.select("*");
+
+		List<Row> results = tEnv.toDataSet(result, rowType).collect();
+		String expected = "black,null,Whatever,[true],[hello],true,0.0,GREEN," +
+				"[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],42,{},null,null,null,null\n" +
+				"blue,null,Charlie,[],[],false,1.337,RED," +
+				"null,1337,{},{\"num\": 42, \"street\": \"Bakerstreet\", \"city\": \"Berlin\", " +
+				"\"state\": \"Berlin\", \"zip\": \"12049\"},null,null,null\n" +
+				"yellow,null,Terminator,[false],[world],false,0.0,GREEN," +
+				"[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],1,{},null,null,null,null";
+		TestBaseUtils.compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testAvroStringAccess() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		Table t = tEnv.fromDataSet(testData(env));
+		Table result = t.select("name");
+		List<Utf8> results = tEnv.toDataSet(result, Types.GENERIC(Utf8.class)).collect();
+		String expected = "Charlie\n" +
+				"Terminator\n" +
+				"Whatever";
+		TestBaseUtils.compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testAvroObjectAccess() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		Table t = tEnv.fromDataSet(testData(env));
+		Table result = t
+				.filter("type_nested.isNotNull")
+				.select("type_nested.flatten()").as("city, num, state, street, zip");
+
+		List<Address> results = tEnv.toDataSet(result, Types.POJO(Address.class)).collect();
+		String expected = USER_1.getTypeNested().toString();
+		TestBaseUtils.compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testAvroToAvro() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		Table t = tEnv.fromDataSet(testData(env));
+		Table result = t.select("*");
+
+		List<User> results = tEnv.toDataSet(result, Types.POJO(User.class)).collect();
+		String expected = USER_1 + "\n" + USER_2 + "\n" + USER_3;
+		TestBaseUtils.compareResultAsText(results, expected);
+	}
+
+	private DataSet<User> testData(ExecutionEnvironment env) {
+		List<User> data = new ArrayList<>(3);
+		data.add(USER_1);
+		data.add(USER_2);
+		data.add(USER_3);
+		return env.fromCollection(data);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-libraries/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/pom.xml b/flink-libraries/flink-table/pom.xml
index 395e5cb..d993f23 100644
--- a/flink-libraries/flink-table/pom.xml
+++ b/flink-libraries/flink-table/pom.xml
@@ -222,21 +222,6 @@ under the License.
 			<scope>test</scope>
 			<type>test-jar</type>
 		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-avro</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-			<type>test-jar</type>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-avro</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
index 866029b..6a2a418 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
@@ -27,7 +27,7 @@ import org.apache.flink.table.expressions.{Cast, Expression, ResolvedFieldRefere
   *
   * @param field The field to convert into a rowtime attribute.
   */
-class ExistingField(val field: String) extends TimestampExtractor {
+final class ExistingField(val field: String) extends TimestampExtractor {
 
   override def getArgumentFields: Array[String] = Array(field)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/614b1e29/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala
index fcbd63f..659b13b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala
@@ -42,6 +42,15 @@ final class StreamRecordTimestamp extends TimestampExtractor {
   override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression = {
     org.apache.flink.table.expressions.StreamRecordTimestamp()
   }
+
+  override def equals(obj: Any): Boolean = obj match {
+    case _: StreamRecordTimestamp => true
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    classOf[StreamRecordTimestamp].hashCode()
+  }
 }
 
 object StreamRecordTimestamp {