You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/11/01 01:52:13 UTC

[4/4] flink git commit: [FLINK-6563] [table] Add builders with time attribute support for KafkaTableSources.

[FLINK-6563] [table] Add builders with time attribute support for KafkaTableSources.

This closes #4638.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0e92b663
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0e92b663
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0e92b663

Branch: refs/heads/master
Commit: 0e92b6632f35b69c62d7747f1cbaa3ee207fb235
Parents: 505d478
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Aug 31 14:35:41 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 1 00:20:38 2017 +0100

----------------------------------------------------------------------
 docs/dev/table/sourceSinks.md                   | 400 ++++++++++++++++---
 .../kafka/Kafka010AvroTableSource.java          |  83 ++++
 .../kafka/Kafka010JsonTableSource.java          | 104 ++++-
 .../connectors/kafka/Kafka010TableSource.java   |  16 +-
 .../kafka/Kafka010AvroTableSourceTest.java      |  13 +-
 .../kafka/Kafka010JsonTableSourceTest.java      |   9 +-
 .../kafka/Kafka011AvroTableSource.java          |  83 ++++
 .../kafka/Kafka011JsonTableSource.java          | 104 ++++-
 .../connectors/kafka/Kafka011TableSource.java   |  17 +-
 .../kafka/Kafka011AvroTableSourceTest.java      |  13 +-
 .../kafka/Kafka011JsonTableSourceTest.java      |   9 +-
 .../kafka/Kafka08AvroTableSource.java           |  83 ++++
 .../kafka/Kafka08JsonTableSource.java           | 104 ++++-
 .../connectors/kafka/Kafka08TableSource.java    |  16 +-
 .../kafka/Kafka08AvroTableSourceTest.java       |  12 +-
 .../kafka/Kafka08JsonTableSourceTest.java       |   9 +-
 .../kafka/Kafka09AvroTableSource.java           |  83 ++++
 .../kafka/Kafka09JsonTableSource.java           | 104 ++++-
 .../connectors/kafka/Kafka09TableSource.java    |  16 +-
 .../kafka/Kafka09AvroTableSourceTest.java       |  13 +-
 .../kafka/Kafka09JsonTableSourceTest.java       |   9 +-
 .../connectors/kafka/KafkaAvroTableSource.java  | 112 +++++-
 .../connectors/kafka/KafkaJsonTableSource.java  | 163 +++++++-
 .../connectors/kafka/KafkaTableSource.java      | 299 +++++++++++++-
 .../kafka/KafkaAvroTableSourceTestBase.java     | 186 +++++++++
 .../kafka/KafkaJsonTableSourceTestBase.java     | 121 ++++++
 .../kafka/KafkaTableSourceTestBase.java         | 196 ++++++---
 .../apache/flink/table/api/TableSchema.scala    |  22 +
 .../flink/table/codegen/CodeGenerator.scala     |  33 +-
 .../table/expressions/fieldExpression.scala     |  19 +-
 .../sql/StreamRecordTimestampSqlFunction.scala  |  38 ++
 .../plan/nodes/datastream/DataStreamScan.scala  |  20 +-
 .../datastream/StreamTableSourceScan.scala      |   1 +
 .../table/sources/DefinedFieldMapping.scala     |   1 +
 .../table/sources/definedTimeAttributes.scala   |   2 +
 .../table/sources/timestampExtractors.scala     |  77 ----
 .../sources/tsextractors/ExistingField.scala    |  68 ++++
 .../tsextractors/StreamRecordTimestamp.scala    |  46 +++
 .../tsextractors/TimestampExtractor.scala       |  32 ++
 .../table/sources/watermarkStrategies.scala     | 101 -----
 .../wmstrategies/AscendingTimestamps.scala      |  40 ++
 .../BoundedOutOfOrderTimestamps.scala           |  41 ++
 .../wmstrategies/watermarkStrategies.scala      |  62 +++
 .../validation/TableSourceValidationTest.scala  |   4 +-
 .../flink/table/utils/testTableSources.scala    |   4 +-
 45 files changed, 2520 insertions(+), 468 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/docs/dev/table/sourceSinks.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md
index d5bb874..43542f3 100644
--- a/docs/dev/table/sourceSinks.md
+++ b/docs/dev/table/sourceSinks.md
@@ -38,106 +38,394 @@ Currently, Flink provides the `CsvTableSource` to read CSV files and a few table
 A custom `TableSource` can be defined by implementing the `BatchTableSource` or `StreamTableSource` interface. See section on [defining a custom TableSource](#define-a-tablesource) for details.
 
 | **Class name** | **Maven dependency** | **Batch?** | **Streaming?** | **Description**
-| `CsvTableSource` | `flink-table` | Y | Y | A simple source for CSV files.
-| `Kafka08JsonTableSource` | `flink-connector-kafka-0.8` | N | Y | A Kafka 0.8 source for JSON data.
-| `Kafka08AvroTableSource` | `flink-connector-kafka-0.8` | N | Y | A Kafka 0.8 source for Avro data.
-| `Kafka09JsonTableSource` | `flink-connector-kafka-0.9` | N | Y | A Kafka 0.9 source for JSON data.
-| `Kafka09AvroTableSource` | `flink-connector-kafka-0.9` | N | Y | A Kafka 0.9 source for Avro data.
-| `Kafka010JsonTableSource` | `flink-connector-kafka-0.10` | N | Y | A Kafka 0.10 source for JSON data.
-| `Kafka010AvroTableSource` | `flink-connector-kafka-0.10` | N | Y | A Kafka 0.10 source for Avro data.
-
-All sources that come with the `flink-table` dependency can be directly used by your Table programs. For all other table sources, you have to add the respective dependency in addition to the `flink-table` dependency.
+| `Kafka011AvroTableSource` | `flink-connector-kafka-0.11` | N | Y | A `TableSource` for Avro-encoded Kafka 0.11 topics.
+| `Kafka011JsonTableSource` | `flink-connector-kafka-0.11` | N | Y | A `TableSource` for flat Json-encoded Kafka 0.11 topics.
+| `Kafka010AvroTableSource` | `flink-connector-kafka-0.10` | N | Y | A `TableSource` for Avro-encoded Kafka 0.10 topics.
+| `Kafka010JsonTableSource` | `flink-connector-kafka-0.10` | N | Y | A `TableSource` for flat Json-encoded Kafka 0.10 topics.
+| `Kafka09AvroTableSource` | `flink-connector-kafka-0.9` | N | Y | A `TableSource` for Avro-encoded Kafka 0.9 topics.
+| `Kafka09JsonTableSource` | `flink-connector-kafka-0.9` | N | Y | A `TableSource` for flat Json-encoded Kafka 0.9 topics.
+| `Kafka08AvroTableSource` | `flink-connector-kafka-0.8` | N | Y | A `TableSource` for Avro-encoded Kafka 0.8 topics.
+| `Kafka08JsonTableSource` | `flink-connector-kafka-0.8` | N | Y | A `TableSource` for flat Json-encoded Kafka 0.8 topics.
+| `CsvTableSource` | `flink-table` | Y | Y | A simple `TableSource` for CSV files.
+
+All sources that come with the `flink-table` dependency are directly available for Table API or SQL programs. For all other table sources, you have to add the respective dependency in addition to the `flink-table` dependency.
 
 {% top %}
 
 ### KafkaJsonTableSource
 
-To use the Kafka JSON source, you have to add the Kafka connector dependency to your project:
+A `KafkaJsonTableSource` ingests JSON-encoded messages from a Kafka topic. Currently, only JSON records with flat (non-nested) schema are supported.
 
-  - `flink-connector-kafka-0.8` for Kafka 0.8,
-  - `flink-connector-kafka-0.9` for Kafka 0.9, or
-  - `flink-connector-kafka-0.10` for Kafka 0.10, respectively.
+A `KafkaJsonTableSource` is created and configured using a builder. The following example shows how to create a `KafkaJsonTableSource` with basic properties:
 
-You can then create the source as follows (example for Kafka 0.8):
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-// specify JSON field names and types
-TypeInformation<Row> typeInfo = Types.ROW(
-  new String[] { "id", "name", "score" },
-  new TypeInformation<?>[] { Types.INT(), Types.STRING(), Types.DOUBLE() }
-);
-
-KafkaJsonTableSource kafkaTableSource = new Kafka08JsonTableSource(
-    kafkaTopic,
-    kafkaProperties,
-    typeInfo);
+// create builder
+TableSource source = Kafka010JsonTableSource.builder()
+  // set Kafka topic
+  .forTopic("sensors")
+  // set Kafka consumer properties
+  .withKafkaProperties(kafkaProps)
+  // set Table schema
+  .withSchema(TableSchema.builder()
+    .field("sensorId", Types.LONG())  
+    .field("temp", Types.DOUBLE())
+    .field("time", Types.SQL_TIMESTAMP()).build())
+  .build();
 {% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-// specify JSON field names and types
-val typeInfo = Types.ROW(
-  Array("id", "name", "score"),
-  Array(Types.INT, Types.STRING, Types.DOUBLE)
-)
+// create builder
+val source: TableSource[_] = Kafka010JsonTableSource.builder()
+  // set Kafka topic
+  .forTopic("sensors")
+  // set Kafka consumer properties
+  .withKafkaProperties(kafkaProps)
+  // set Table schema
+  .withSchema(TableSchema.builder()
+    .field("sensorId", Types.LONG)
+    .field("temp", Types.DOUBLE)
+    .field("time", Types.SQL_TIMESTAMP).build())
+  .build()
+{% endhighlight %}
+</div>
+</div>
+
+#### Optional Configuration
 
-val kafkaTableSource = new Kafka08JsonTableSource(
-    kafkaTopic,
-    kafkaProperties,
-    typeInfo)
+* **Time Attributes:** Please see the sections on [configuring a rowtime attribute](#configure-a-rowtime-attribute) and [configuring a processing time attribute](#configure-a-processing-time-attribute).
+
+* **Explicit JSON parse schema:** By default, the JSON records are parsed with the table schema. You can configure an explicit JSON schema and provide a mapping from table schema fields to JSON fields as shown in the following example.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+Map<String, String> mapping = new HashMap<>();
+mapping.put("sensorId", "id");
+mapping.put("temperature", "temp");
+
+TableSource source = Kafka010JsonTableSource.builder()
+  // ...
+  // set Table schema
+  .withSchema(TableSchema.builder()
+    .field("sensorId", Types.LONG())
+    .field("temperature", Types.DOUBLE()).build())
+  // set JSON parsing schema
+  .forJsonSchema(TableSchema.builder()
+    .field("id", Types.LONG())
+    .field("temp", Types.DOUBLE()).build())
+  // set mapping from table fields to JSON fields
+  .withTableToJsonMapping(mapping)
+  .build();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val source: TableSource[_] = Kafka010JsonTableSource.builder()
+  // ...
+  // set Table schema
+  .withSchema(TableSchema.builder()
+    .field("sensorId", Types.LONG)
+    .field("temperature", Types.DOUBLE).build())
+  // set JSON parsing schema
+  .forJsonSchema(TableSchema.builder()
+    .field("id", Types.LONG)
+    .field("temp", Types.DOUBLE).build())
+  // set mapping from table fields to JSON fields
+  .withTableToJsonMapping(Map(
+    "sensorId" -> "id", 
+    "temperature" -> "temp").asJava)
+  .build()
 {% endhighlight %}
 </div>
 </div>
 
-By default, a missing JSON field does not fail the source. You can configure this via:
+* **Missing Field Handling** By default, a missing JSON field is set to `null`. You can enable strict JSON parsing that will cancel the source (and query) if a field is missing.
 
-```java
-// Fail on missing JSON field
-tableSource.setFailOnMissingField(true);
-```
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+TableSource source = Kafka010JsonTableSource.builder()
+  // ...
+  // configure missing field behavior
+  .failOnMissingField(true)
+  .build();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val source: TableSource[_] = Kafka010JsonTableSource.builder()
+  // ...
+  // configure missing field behavior
+  .failOnMissingField(true)
+  .build()
+{% endhighlight %}
+</div>
+</div>
 
 {% top %}
 
 ### KafkaAvroTableSource
 
-The `KafkaAvroTableSource` allows you to read Avro's `SpecificRecord` objects from Kafka.
+A `KafkaAvroTableSource` ingests Avro-encoded records from a Kafka topic.
+
+A `KafkaAvroTableSource` is created and configured using a builder. The following example shows how to create a `KafkaAvroTableSource` with basic properties:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// create builder
+TableSource source = Kafka010AvroTableSource.builder()
+  // set Kafka topic
+  .forTopic("sensors")
+  // set Kafka consumer properties
+  .withKafkaProperties(kafkaProps)
+  // set Table schema
+  .withSchema(TableSchema.builder()
+    .field("sensorId", Types.LONG())
+    .field("temp", Types.DOUBLE())
+    .field("time", Types.SQL_TIMESTAMP()).build())
+  // set class of Avro record
+  .forAvroRecordClass(SensorReading.class)
+  .build();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// create builder
+val source: TableSource[_] = Kafka010JsonTableSource.builder()
+  // set Kafka topic
+  .forTopic("sensors")
+  // set Kafka consumer properties
+  .withKafkaProperties(kafkaProps)
+  // set Table schema
+  .withSchema(TableSchema.builder()
+    .field("sensorId", Types.LONG)
+    .field("temp", Types.DOUBLE)
+    .field("time", Types.SQL_TIMESTAMP).build())
+  // set class of Avro record
+  .forAvroRecordClass(classOf[SensorReading])
+  .build()
+{% endhighlight %}
+</div>
+</div>
+
+**NOTE:** The specified Avro record class must provide all fields of the table schema with corresponding type.
+
+#### Optional Configuration
+
+* **Time Attributes:** Please see the sections on [configuring a rowtime attribute](#configure-a-rowtime-attribute) and [configuring a processing time attribute](#configure-a-processing-time-attribute).
+
+* **Explicit Schema Field to Avro Mapping:** By default, all fields of the table schema are mapped by name to fields of the Avro records. If the fields in the Avro records have different names, a mapping from table schema fields to Avro fields can be specified.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+Map<String, String> mapping = new HashMap<>();
+mapping.put("sensorId", "id");
+mapping.put("temperature", "temp");
+
+TableSource source = Kafka010AvroTableSource.builder()
+  // ...
+  // set Table schema
+  .withSchema(TableSchema.builder()
+    .field("sensorId", Types.LONG())
+    .field("temperature", Types.DOUBLE()).build())
+  // set class of Avro record with fields [id, temp]
+  .forAvroRecordClass(SensorReading.class)
+  // set mapping from table fields to JSON fields
+  .withTableToJsonMapping(mapping)
+  .build();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val source: TableSource[_] = Kafka010AvroTableSource.builder()
+  // ...
+  // set Table schema
+  .withSchema(TableSchema.builder()
+    .field("sensorId", Types.LONG)
+    .field("temperature", Types.DOUBLE).build())
+  // set class of Avro record with fields [id, temp]
+  .forAvroRecordClass(classOf[SensorReading])
+  // set mapping from table fields to JSON fields
+  .withTableToJsonMapping(Map(
+    "sensorId" -> "id", 
+    "temperature" -> "temp").asJava)
+  .build()
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+
+### Configuring a Processing Time Attribute
 
-To use the Kafka Avro source, you have to add the Kafka connector dependency to your project:
+[Processing time attributes](streaming.html#processing-time) are commonly used in streaming queries. A processing time attribute returns the current wall-clock time of the operator that accesses it. 
 
-  - `flink-connector-kafka-0.8` for Kafka 0.8,
-  - `flink-connector-kafka-0.9` for Kafka 0.9, or
-  - `flink-connector-kafka-0.10` for Kafka 0.10, respectively.
+Batch queries support processing time attributes as well. However, processing time attributes are initialized with the wall-clock time of the table scan operator and keep this value throughout the query evaluation. 
 
-You can then create the source as follows (example for Kafka 0.8):
+A table schema field of type `SQL_TIMESTAMP` can be declared as a processing time attribute as shown in the following example.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-// pass the generated Avro class to the TableSource
-Class<? extends SpecificRecord> clazz = MyAvroType.class; 
+TableSource source = Kafka010JsonTableSource.builder()
+  // ... 
+  .withSchema(TableSchema.builder()
+    .field("sensorId", Types.LONG())  
+    .field("temp", Types.DOUBLE())
+    // field "ptime" is of type SQL_TIMESTAMP
+    .field("ptime", Types.SQL_TIMESTAMP()).build())
+  // declare "ptime" as processing time attribute
+  .withProctimeAttribute("ptime")
+  .build();
+{% endhighlight %}
+</div>
 
-KafkaAvroTableSource kafkaTableSource = new Kafka08AvroTableSource(
-    kafkaTopic,
-    kafkaProperties,
-    clazz);
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val source: TableSource[_] = Kafka010JsonTableSource.builder()
+  // ...
+  .withSchema(TableSchema.builder()
+    .field("sensorId", Types.LONG)
+    .field("temp", Types.DOUBLE)
+    // field "ptime" is of type SQL_TIMESTAMP
+    .field("ptime", Types.SQL_TIMESTAMP).build())
+  // declare "ptime" as processing time attribute
+  .withProctimeAttribute("ptime")
+  .build()
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+
+### Configuring a Rowtime Attribute
+
+[Rowtime attributes](streaming.html#event-time) are attributes of type `TIMESTAMP` and handled in a unified way in stream and batch queries.
+
+A table schema field of type `SQL_TIMESTAMP` can be declared as rowtime attribute by specifying 
+
+* the name of the field, 
+* a `TimestampExtractor` that computes the actual value for the attribute (usually from one or more other attributes), and
+* a `WatermarkStrategy` that specifies how watermarks are generated for the the rowtime attribute.
+
+The following example shows how to configure a rowtime attribute.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+TableSource source = Kafka010JsonTableSource.builder()
+  // ...
+  .withSchema(TableSchema.builder()
+    .field("sensorId", Types.LONG())
+    .field("temp", Types.DOUBLE())
+    // field "rtime" is of type SQL_TIMESTAMP
+    .field("rtime", Types.SQL_TIMESTAMP()).build())
+  .withRowtimeAttribute(
+    // "rtime" is rowtime attribute
+    "rtime",
+    // value of "rtime" is extracted from existing field with same name
+    new ExistingField("rtime"),
+    // values of "rtime" are at most out-of-order by 30 seconds
+    new BoundedOutOfOrderWatermarks(30000L))
+  .build();
 {% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-// pass the generated Avro class to the TableSource
-val clazz = classOf[MyAvroType]
+val source: TableSource[_] = Kafka010JsonTableSource.builder()
+  // ...
+  .withSchema(TableSchema.builder()
+    .field("sensorId", Types.LONG)
+    .field("temp", Types.DOUBLE)
+    // field "rtime" is of type SQL_TIMESTAMP
+    .field("rtime", Types.SQL_TIMESTAMP).build())
+  .withRowtimeAttribute(
+    // "rtime" is rowtime attribute
+    "rtime",
+    // value of "rtime" is extracted from existing field with same name
+    new ExistingField("rtime"),
+    // values of "rtime" are at most out-of-order by 30 seconds
+    new BoundedOutOfOrderTimestamps(30000L))
+  .build()
+{% endhighlight %}
+</div>
+</div>
+
+#### Extracting Kafka 0.10+ Timestamps into Rowtime Attribute
+
+Since Kafka 0.10, Kafka messages have a timestamp as metadata that specifies when the record was written into the Kafka topic. `KafkaTableSources` can assign Kafka's message timestamp as rowtime attribute as follows: 
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+TableSource source = Kafka010JsonTableSource.builder()
+  // ...
+  .withSchema(TableSchema.builder()
+    .field("sensorId", Types.LONG())
+    .field("temp", Types.DOUBLE())
+    // field "rtime" is of type SQL_TIMESTAMP
+    .field("rtime", Types.SQL_TIMESTAMP()).build())
+  // use Kafka timestamp as rowtime attribute
+  .withKafkaTimestampAsRowtimeAttribute()(
+    // "rtime" is rowtime attribute
+    "rtime",
+    // values of "rtime" are ascending
+    new AscendingTimestamps())
+  .build();
+{% endhighlight %}
+</div>
 
-val kafkaTableSource = new Kafka08AvroTableSource(
-    kafkaTopic,
-    kafkaProperties,
-    clazz)
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val source: TableSource[_] = Kafka010JsonTableSource.builder()
+  // ...
+  .withSchema(TableSchema.builder()
+    .field("sensorId", Types.LONG)
+    .field("temp", Types.DOUBLE)
+    // field "rtime" is of type SQL_TIMESTAMP
+    .field("rtime", Types.SQL_TIMESTAMP).build())
+  // use Kafka timestamp as rowtime attribute
+  .withKafkaTimestampAsRowtimeAttribute()(
+    // "rtime" is rowtime attribute
+    "rtime",
+    // values of "rtime" are ascending
+    new AscendingTimestamps())
+  .build()
 {% endhighlight %}
 </div>
 </div>
 
+#### Provided TimestampExtractors
+
+Flink provides `TimestampExtractor` implementations for common use cases.
+The following `TimestampExtractor` implementations are currently available:
+
+* `ExistingField(fieldName)`: Extracts the value of a rowtime attribute from an existing `LONG` or `SQL_TIMESTAMP` field.
+* `StreamRecordTimestamp()`: Extracts the value of a rowtime attribute from the timestamp of the `DataStream` `StreamRecord`. Note, this `TimestampExtractor` is not available for batch table sources.
+
+A custom `TimestampExtrator` can be defined by implementing the corresponding interface.
+
+#### Provided WatermarkStrategies
+
+Flink provides `WatermarkStrategy` implementations for common use cases.
+The following `WatermarkStrategy` implementations are currently available:
+
+* `AscendingTimestamps`: A watermark strategy for ascending timestamps. Records with timestamps that are out-of-order will be considered late.
+* `BoundedOutOfOrderTimestamps(delay)`: A watermark strategy for timestamps that are at most out-of-order by the specified delay.
+
+A custom `WatermarkStrategy` can be defined by implementing the corresponding interface.
+
 {% top %}
 
 ### CsvTableSource

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
index 9921428..01e6329 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
@@ -19,12 +19,17 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.avro.specific.SpecificRecord;
 import org.apache.avro.specific.SpecificRecordBase;
 
+import java.util.Collections;
+import java.util.Map;
 import java.util.Properties;
 
 /**
@@ -37,22 +42,100 @@ public class Kafka010AvroTableSource extends KafkaAvroTableSource {
 	 *
 	 * @param topic      Kafka topic to consume.
 	 * @param properties Properties for the Kafka consumer.
+	 * @param schema     Schema of the produced table.
 	 * @param record     Avro specific record.
 	 */
 	public Kafka010AvroTableSource(
 		String topic,
 		Properties properties,
+		TableSchema schema,
 		Class<? extends SpecificRecordBase> record) {
 
 		super(
 			topic,
 			properties,
+			schema,
 			record);
 	}
 
+	/**
+	 * Sets a mapping from schema fields to fields of the produced Avro record.
+	 *
+	 * <p>A field mapping is required if the fields of produced tables should be named different than
+	 * the fields of the Avro record.
+	 * The key of the provided Map refers to the field of the table schema,
+	 * the value to the field of the Avro record.</p>
+	 *
+	 * @param fieldMapping A mapping from schema fields to Avro fields.
+	 */
+	@Override
+	public void setFieldMapping(Map<String, String> fieldMapping) {
+		super.setFieldMapping(fieldMapping);
+	}
+
+	/**
+	 * Declares a field of the schema to be a processing time attribute.
+	 *
+	 * @param proctimeAttribute The name of the field that becomes the processing time field.
+	 */
+	@Override
+	public void setProctimeAttribute(String proctimeAttribute) {
+		super.setProctimeAttribute(proctimeAttribute);
+	}
+
+	/**
+	 * Declares a field of the schema to be a rowtime attribute.
+	 *
+	 * @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
+	 */
+	public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
+		Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
+		super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
+	}
+
 	@Override
 	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
 		return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties);
 	}
+
+	/**
+	 * Returns a builder to configure and create a {@link Kafka010AvroTableSource}.
+	 * @return A builder to configure and create a {@link Kafka010AvroTableSource}.
+	 */
+	public static Builder builder() {
+		return new Builder();
+	}
+
+	/**
+	 * A builder to configure and create a {@link Kafka010AvroTableSource}.
+	 */
+	public static class Builder extends KafkaAvroTableSource.Builder<Kafka010AvroTableSource, Kafka010AvroTableSource.Builder> {
+
+		@Override
+		protected boolean supportsKafkaTimestamps() {
+			return true;
+		}
+
+		@Override
+		protected Kafka010AvroTableSource.Builder builder() {
+			return this;
+		}
+
+		/**
+		 * Builds and configures a {@link Kafka010AvroTableSource}.
+		 *
+		 * @return A configured {@link Kafka010AvroTableSource}.
+		 */
+		@Override
+		public Kafka010AvroTableSource build() {
+			Kafka010AvroTableSource tableSource = new Kafka010AvroTableSource(
+				getTopic(),
+				getKafkaProps(),
+				getTableSchema(),
+				getAvroRecordClass());
+			super.configureTableSource(tableSource);
+			return tableSource;
+		}
+	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
index f400f6b..e263cf2 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
@@ -18,11 +18,15 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
 
+import java.util.Collections;
+import java.util.Map;
 import java.util.Properties;
 
 /**
@@ -33,21 +37,103 @@ public class Kafka010JsonTableSource extends KafkaJsonTableSource {
 	/**
 	 * Creates a Kafka 0.10 JSON {@link StreamTableSource}.
 	 *
-	 * @param topic      Kafka topic to consume.
-	 * @param properties Properties for the Kafka consumer.
-	 * @param typeInfo   Type information describing the result type. The field names are used
-	 *                   to parse the JSON file and so are the types.
+	 * @param topic       Kafka topic to consume.
+	 * @param properties  Properties for the Kafka consumer.
+	 * @param tableSchema The schema of the table.
+	 * @param jsonSchema  The schema of the JSON messages to decode from Kafka.
 	 */
 	public Kafka010JsonTableSource(
-			String topic,
-			Properties properties,
-			TypeInformation<Row> typeInfo) {
+		String topic,
+		Properties properties,
+		TableSchema tableSchema,
+		TableSchema jsonSchema) {
 
-		super(topic, properties, typeInfo);
+		super(topic, properties, tableSchema, jsonSchema);
+	}
+
+	/**
+	 * Sets the flag that specifies the behavior in case of missing fields.
+	 * TableSource will fail for missing fields if set to true. If set to false, the missing field is set to null.
+	 *
+	 * @param failOnMissingField Flag that specifies the TableSource behavior in case of missing fields.
+	 */
+	@Override
+	public void setFailOnMissingField(boolean failOnMissingField) {
+		super.setFailOnMissingField(failOnMissingField);
+	}
+
+	/**
+	 * Sets the mapping from table schema fields to JSON schema fields.
+	 *
+	 * @param fieldMapping The mapping from table schema fields to JSON schema fields.
+	 */
+	@Override
+	public void setFieldMapping(Map<String, String> fieldMapping) {
+		super.setFieldMapping(fieldMapping);
+	}
+
+	/**
+	 * Declares a field of the schema to be a processing time attribute.
+	 *
+	 * @param proctimeAttribute The name of the field that becomes the processing time field.
+	 */
+	@Override
+	public void setProctimeAttribute(String proctimeAttribute) {
+		super.setProctimeAttribute(proctimeAttribute);
+	}
+
+	/**
+	 * Declares a field of the schema to be a rowtime attribute.
+	 *
+	 * @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
+	 */
+	public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
+		Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
+		super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
 	}
 
 	@Override
 	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
 		return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties);
 	}
+
+	/**
+	 * Returns a builder to configure and create a {@link Kafka010JsonTableSource}.
+	 * @return A builder to configure and create a {@link Kafka010JsonTableSource}.
+	 */
+	public static Kafka010JsonTableSource.Builder builder() {
+		return new Kafka010JsonTableSource.Builder();
+	}
+
+	/**
+	 * A builder to configure and create a {@link Kafka010JsonTableSource}.
+	 */
+	public static class Builder extends KafkaJsonTableSource.Builder<Kafka010JsonTableSource, Kafka010JsonTableSource.Builder> {
+
+		@Override
+		protected boolean supportsKafkaTimestamps() {
+			return true;
+		}
+
+		@Override
+		protected Kafka010JsonTableSource.Builder builder() {
+			return this;
+		}
+
+		/**
+		 * Builds and configures a {@link Kafka010JsonTableSource}.
+		 *
+		 * @return A configured {@link Kafka010JsonTableSource}.
+		 */
+		@Override
+		public Kafka010JsonTableSource build() {
+			Kafka010JsonTableSource tableSource = new Kafka010JsonTableSource(
+				getTopic(),
+				getKafkaProps(),
+				getTableSchema(),
+				getJsonSchema());
+			super.configureTableSource(tableSource);
+			return tableSource;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
index a6de13a..5475c9f 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.types.Row;
 
@@ -28,7 +29,10 @@ import java.util.Properties;
 /**
  * Kafka {@link StreamTableSource} for Kafka 0.10.
  */
-public class Kafka010TableSource extends Kafka09TableSource {
+public abstract class Kafka010TableSource extends KafkaTableSource {
+
+	// The deserialization schema for the Kafka records
+	private final DeserializationSchema<Row> deserializationSchema;
 
 	/**
 	 * Creates a Kafka 0.10 {@link StreamTableSource}.
@@ -43,9 +47,17 @@ public class Kafka010TableSource extends Kafka09TableSource {
 			String topic,
 			Properties properties,
 			DeserializationSchema<Row> deserializationSchema,
+			TableSchema schema,
 			TypeInformation<Row> typeInfo) {
 
-		super(topic, properties, deserializationSchema, typeInfo);
+		super(topic, properties, schema, typeInfo);
+
+		this.deserializationSchema = deserializationSchema;
+	}
+
+	@Override
+	public DeserializationSchema<Row> getDeserializationSchema() {
+		return this.deserializationSchema;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
index 025fefc..e977d49 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
@@ -18,25 +18,18 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.types.Row;
 
-import java.util.Properties;
-
 /**
  * Tests for the {@link Kafka010AvroTableSource}.
  */
-public class Kafka010AvroTableSourceTest extends KafkaTableSourceTestBase {
+public class Kafka010AvroTableSourceTest extends KafkaAvroTableSourceTestBase {
 
 	@Override
-	protected KafkaTableSource createTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo) {
-
-		return new Kafka010AvroTableSource(
-			topic,
-			properties,
-			AvroSpecificRecord.class);
+	protected KafkaTableSource.Builder getBuilder() {
+		return Kafka010AvroTableSource.builder();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
index 092f5ea..9a0ab04 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
@@ -18,21 +18,18 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
 import org.apache.flink.types.Row;
 
-import java.util.Properties;
-
 /**
  * Tests for the {@link Kafka010JsonTableSource}.
  */
-public class Kafka010JsonTableSourceTest extends KafkaTableSourceTestBase {
+public class Kafka010JsonTableSourceTest extends KafkaJsonTableSourceTestBase {
 
 	@Override
-	protected KafkaTableSource createTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo) {
-		return new Kafka010JsonTableSource(topic, properties, typeInfo);
+	protected KafkaTableSource.Builder getBuilder() {
+		return Kafka010JsonTableSource.builder();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
index edc37cb..81d3496 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
@@ -19,12 +19,17 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.avro.specific.SpecificRecord;
 import org.apache.avro.specific.SpecificRecordBase;
 
+import java.util.Collections;
+import java.util.Map;
 import java.util.Properties;
 
 /**
@@ -37,22 +42,100 @@ public class Kafka011AvroTableSource extends KafkaAvroTableSource {
 	 *
 	 * @param topic      Kafka topic to consume.
 	 * @param properties Properties for the Kafka consumer.
+	 * @param schema     Schema of the produced table.
 	 * @param record     Avro specific record.
 	 */
 	public Kafka011AvroTableSource(
 		String topic,
 		Properties properties,
+		TableSchema schema,
 		Class<? extends SpecificRecordBase> record) {
 
 		super(
 			topic,
 			properties,
+			schema,
 			record);
 	}
 
+	/**
+	 * Sets a mapping from schema fields to fields of the produced Avro record.
+	 *
+	 * <p>A field mapping is required if the fields of produced tables should be named different than
+	 * the fields of the Avro record.
+	 * The key of the provided Map refers to the field of the table schema,
+	 * the value to the field of the Avro record.</p>
+	 *
+	 * @param fieldMapping A mapping from schema fields to Avro fields.
+	 */
+	@Override
+	public void setFieldMapping(Map<String, String> fieldMapping) {
+		super.setFieldMapping(fieldMapping);
+	}
+
+	/**
+	 * Declares a field of the schema to be a processing time attribute.
+	 *
+	 * @param proctimeAttribute The name of the field that becomes the processing time field.
+	 */
+	@Override
+	public void setProctimeAttribute(String proctimeAttribute) {
+		super.setProctimeAttribute(proctimeAttribute);
+	}
+
+	/**
+	 * Declares a field of the schema to be a rowtime attribute.
+	 *
+	 * @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
+	 */
+	public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
+		Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
+		super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
+	}
+
 	@Override
 	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
 		return new FlinkKafkaConsumer011<>(topic, deserializationSchema, properties);
 	}
+
+	/**
+	 * Returns a builder to configure and create a {@link Kafka011AvroTableSource}.
+	 * @return A builder to configure and create a {@link Kafka011AvroTableSource}.
+	 */
+	public static Builder builder() {
+		return new Builder();
+	}
+
+	/**
+	 * A builder to configure and create a {@link Kafka011AvroTableSource}.
+	 */
+	public static class Builder extends KafkaAvroTableSource.Builder<Kafka011AvroTableSource, Kafka011AvroTableSource.Builder> {
+
+		@Override
+		protected boolean supportsKafkaTimestamps() {
+			return true;
+		}
+
+		@Override
+		protected Kafka011AvroTableSource.Builder builder() {
+			return this;
+		}
+
+		/**
+		 * Builds and configures a {@link Kafka011AvroTableSource}.
+		 *
+		 * @return A configured {@link Kafka011AvroTableSource}.
+		 */
+		@Override
+		public Kafka011AvroTableSource build() {
+			Kafka011AvroTableSource tableSource = new Kafka011AvroTableSource(
+				getTopic(),
+				getKafkaProps(),
+				getTableSchema(),
+				getAvroRecordClass());
+			super.configureTableSource(tableSource);
+			return tableSource;
+		}
+	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
index 471c2d2..1aff670 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
@@ -18,11 +18,15 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
 
+import java.util.Collections;
+import java.util.Map;
 import java.util.Properties;
 
 /**
@@ -33,21 +37,103 @@ public class Kafka011JsonTableSource extends KafkaJsonTableSource {
 	/**
 	 * Creates a Kafka 0.11 JSON {@link StreamTableSource}.
 	 *
-	 * @param topic      Kafka topic to consume.
-	 * @param properties Properties for the Kafka consumer.
-	 * @param typeInfo   Type information describing the result type. The field names are used
-	 *                   to parse the JSON file and so are the types.
+	 * @param topic       Kafka topic to consume.
+	 * @param properties  Properties for the Kafka consumer.
+	 * @param tableSchema The schema of the table.
+	 * @param jsonSchema  The schema of the JSON messages to decode from Kafka.
 	 */
 	public Kafka011JsonTableSource(
-			String topic,
-			Properties properties,
-			TypeInformation<Row> typeInfo) {
+		String topic,
+		Properties properties,
+		TableSchema tableSchema,
+		TableSchema jsonSchema) {
 
-		super(topic, properties, typeInfo);
+		super(topic, properties, tableSchema, jsonSchema);
+	}
+
+	/**
+	 * Sets the flag that specifies the behavior in case of missing fields.
+	 * TableSource will fail for missing fields if set to true. If set to false, the missing field is set to null.
+	 *
+	 * @param failOnMissingField Flag that specifies the TableSource behavior in case of missing fields.
+	 */
+	@Override
+	public void setFailOnMissingField(boolean failOnMissingField) {
+		super.setFailOnMissingField(failOnMissingField);
+	}
+
+	/**
+	 * Sets the mapping from table schema fields to JSON schema fields.
+	 *
+	 * @param fieldMapping The mapping from table schema fields to JSON schema fields.
+	 */
+	@Override
+	public void setFieldMapping(Map<String, String> fieldMapping) {
+		super.setFieldMapping(fieldMapping);
+	}
+
+	/**
+	 * Declares a field of the schema to be a processing time attribute.
+	 *
+	 * @param proctimeAttribute The name of the field that becomes the processing time field.
+	 */
+	@Override
+	public void setProctimeAttribute(String proctimeAttribute) {
+		super.setProctimeAttribute(proctimeAttribute);
+	}
+
+	/**
+	 * Declares a field of the schema to be a rowtime attribute.
+	 *
+	 * @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
+	 */
+	public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
+		Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
+		super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
 	}
 
 	@Override
 	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
 		return new FlinkKafkaConsumer011<>(topic, deserializationSchema, properties);
 	}
+
+	/**
+	 * Returns a builder to configure and create a {@link Kafka011JsonTableSource}.
+	 * @return A builder to configure and create a {@link Kafka011JsonTableSource}.
+	 */
+	public static Kafka011JsonTableSource.Builder builder() {
+		return new Kafka011JsonTableSource.Builder();
+	}
+
+	/**
+	 * A builder to configure and create a {@link Kafka011JsonTableSource}.
+	 */
+	public static class Builder extends KafkaJsonTableSource.Builder<Kafka011JsonTableSource, Kafka011JsonTableSource.Builder> {
+
+		@Override
+		protected boolean supportsKafkaTimestamps() {
+			return true;
+		}
+
+		@Override
+		protected Kafka011JsonTableSource.Builder builder() {
+			return this;
+		}
+
+		/**
+		 * Builds and configures a {@link Kafka011JsonTableSource}.
+		 *
+		 * @return A configured {@link Kafka011JsonTableSource}.
+		 */
+		@Override
+		public Kafka011JsonTableSource build() {
+			Kafka011JsonTableSource tableSource = new Kafka011JsonTableSource(
+				getTopic(),
+				getKafkaProps(),
+				getTableSchema(),
+				getJsonSchema());
+			super.configureTableSource(tableSource);
+			return tableSource;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
index 5eaea97..576a421 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.types.Row;
 
@@ -28,7 +29,10 @@ import java.util.Properties;
 /**
  * Kafka {@link StreamTableSource} for Kafka 0.11.
  */
-public class Kafka011TableSource extends Kafka09TableSource {
+public abstract class Kafka011TableSource extends KafkaTableSource {
+
+	// The deserialization schema for the Kafka records
+	private final DeserializationSchema<Row> deserializationSchema;
 
 	/**
 	 * Creates a Kafka 0.11 {@link StreamTableSource}.
@@ -43,13 +47,22 @@ public class Kafka011TableSource extends Kafka09TableSource {
 			String topic,
 			Properties properties,
 			DeserializationSchema<Row> deserializationSchema,
+			TableSchema schema,
 			TypeInformation<Row> typeInfo) {
 
-		super(topic, properties, deserializationSchema, typeInfo);
+		super(topic, properties, schema, typeInfo);
+
+		this.deserializationSchema = deserializationSchema;
+	}
+
+	@Override
+	public DeserializationSchema<Row> getDeserializationSchema() {
+		return this.deserializationSchema;
 	}
 
 	@Override
 	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
 		return new FlinkKafkaConsumer011<>(topic, deserializationSchema, properties);
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java
index e60bf17..bde0761 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java
@@ -18,25 +18,18 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.types.Row;
 
-import java.util.Properties;
-
 /**
  * Tests for the {@link Kafka011AvroTableSource}.
  */
-public class Kafka011AvroTableSourceTest extends KafkaTableSourceTestBase {
+public class Kafka011AvroTableSourceTest extends KafkaAvroTableSourceTestBase {
 
 	@Override
-	protected KafkaTableSource createTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo) {
-
-		return new Kafka011AvroTableSource(
-			topic,
-			properties,
-			AvroSpecificRecord.class);
+	protected KafkaTableSource.Builder getBuilder() {
+		return Kafka011AvroTableSource.builder();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java
index c2e256c..6870ecc 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java
@@ -18,21 +18,18 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
 import org.apache.flink.types.Row;
 
-import java.util.Properties;
-
 /**
  * Tests for the {@link Kafka011JsonTableSource}.
  */
-public class Kafka011JsonTableSourceTest extends KafkaTableSourceTestBase {
+public class Kafka011JsonTableSourceTest extends KafkaJsonTableSourceTestBase {
 
 	@Override
-	protected KafkaTableSource createTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo) {
-		return new Kafka011JsonTableSource(topic, properties, typeInfo);
+	protected KafkaTableSource.Builder getBuilder() {
+		return Kafka011JsonTableSource.builder();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
index a1bea78..998f4d4 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
@@ -19,12 +19,17 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.avro.specific.SpecificRecord;
 import org.apache.avro.specific.SpecificRecordBase;
 
+import java.util.Collections;
+import java.util.Map;
 import java.util.Properties;
 
 /**
@@ -37,22 +42,100 @@ public class Kafka08AvroTableSource extends KafkaAvroTableSource {
 	 *
 	 * @param topic      Kafka topic to consume.
 	 * @param properties Properties for the Kafka consumer.
+	 * @param schema     Schema of the produced table.
 	 * @param record     Avro specific record.
 	 */
 	public Kafka08AvroTableSource(
 		String topic,
 		Properties properties,
+		TableSchema schema,
 		Class<? extends SpecificRecordBase> record) {
 
 		super(
 			topic,
 			properties,
+			schema,
 			record);
 	}
 
+	/**
+	 * Sets a mapping from schema fields to fields of the produced Avro record.
+	 *
+	 * <p>A field mapping is required if the fields of produced tables should be named different than
+	 * the fields of the Avro record.
+	 * The key of the provided Map refers to the field of the table schema,
+	 * the value to the field of the Avro record.</p>
+	 *
+	 * @param fieldMapping A mapping from schema fields to Avro fields.
+	 */
+	@Override
+	public void setFieldMapping(Map<String, String> fieldMapping) {
+		super.setFieldMapping(fieldMapping);
+	}
+
+	/**
+	 * Declares a field of the schema to be a processing time attribute.
+	 *
+	 * @param proctimeAttribute The name of the field that becomes the processing time field.
+	 */
+	@Override
+	public void setProctimeAttribute(String proctimeAttribute) {
+		super.setProctimeAttribute(proctimeAttribute);
+	}
+
+	/**
+	 * Declares a field of the schema to be a rowtime attribute.
+	 *
+	 * @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
+	 */
+	public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
+		Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
+		super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
+	}
+
 	@Override
 	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
 		return new FlinkKafkaConsumer08<>(topic, deserializationSchema, properties);
 	}
+
+	/**
+	 * Returns a builder to configure and create a {@link Kafka08AvroTableSource}.
+	 * @return A builder to configure and create a {@link Kafka08AvroTableSource}.
+	 */
+	public static Builder builder() {
+		return new Builder();
+	}
+
+	/**
+	 * A builder to configure and create a {@link Kafka08AvroTableSource}.
+	 */
+	public static class Builder extends KafkaAvroTableSource.Builder<Kafka08AvroTableSource, Kafka08AvroTableSource.Builder> {
+
+		@Override
+		protected boolean supportsKafkaTimestamps() {
+			return false;
+		}
+
+		@Override
+		protected Kafka08AvroTableSource.Builder builder() {
+			return this;
+		}
+
+		/**
+		 * Builds and configures a {@link Kafka08AvroTableSource}.
+		 *
+		 * @return A configured {@link Kafka08AvroTableSource}.
+		 */
+		@Override
+		public Kafka08AvroTableSource build() {
+			Kafka08AvroTableSource tableSource = new Kafka08AvroTableSource(
+				getTopic(),
+				getKafkaProps(),
+				getTableSchema(),
+				getAvroRecordClass());
+			super.configureTableSource(tableSource);
+			return tableSource;
+		}
+	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
index 05a2c71..aab9ea8 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
@@ -18,11 +18,15 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
 
+import java.util.Collections;
+import java.util.Map;
 import java.util.Properties;
 
 /**
@@ -33,21 +37,103 @@ public class Kafka08JsonTableSource extends KafkaJsonTableSource {
 	/**
 	 * Creates a Kafka 0.8 JSON {@link StreamTableSource}.
 	 *
-	 * @param topic      Kafka topic to consume.
-	 * @param properties Properties for the Kafka consumer.
-	 * @param typeInfo   Type information describing the result type. The field names are used
-	 *                   to parse the JSON file and so are the types.
+	 * @param topic       Kafka topic to consume.
+	 * @param properties  Properties for the Kafka consumer.
+	 * @param tableSchema The schema of the table.
+	 * @param jsonSchema  The schema of the JSON messages to decode from Kafka.
 	 */
 	public Kafka08JsonTableSource(
-			String topic,
-			Properties properties,
-			TypeInformation<Row> typeInfo) {
+		String topic,
+		Properties properties,
+		TableSchema tableSchema,
+		TableSchema jsonSchema) {
 
-		super(topic, properties, typeInfo);
+		super(topic, properties, tableSchema, jsonSchema);
+	}
+
+	/**
+	 * Sets the flag that specifies the behavior in case of missing fields.
+	 * TableSource will fail for missing fields if set to true. If set to false, the missing field is set to null.
+	 *
+	 * @param failOnMissingField Flag that specifies the TableSource behavior in case of missing fields.
+	 */
+	@Override
+	public void setFailOnMissingField(boolean failOnMissingField) {
+		super.setFailOnMissingField(failOnMissingField);
+	}
+
+	/**
+	 * Sets the mapping from table schema fields to JSON schema fields.
+	 *
+	 * @param fieldMapping The mapping from table schema fields to JSON schema fields.
+	 */
+	@Override
+	public void setFieldMapping(Map<String, String> fieldMapping) {
+		super.setFieldMapping(fieldMapping);
+	}
+
+	/**
+	 * Declares a field of the schema to be a processing time attribute.
+	 *
+	 * @param proctimeAttribute The name of the field that becomes the processing time field.
+	 */
+	@Override
+	public void setProctimeAttribute(String proctimeAttribute) {
+		super.setProctimeAttribute(proctimeAttribute);
+	}
+
+	/**
+	 * Declares a field of the schema to be a rowtime attribute.
+	 *
+	 * @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
+	 */
+	public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
+		Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
+		super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
 	}
 
 	@Override
 	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
 		return new FlinkKafkaConsumer08<>(topic, deserializationSchema, properties);
 	}
+
+	/**
+	 * Returns a builder to configure and create a {@link Kafka08JsonTableSource}.
+	 * @return A builder to configure and create a {@link Kafka08JsonTableSource}.
+	 */
+	public static Kafka08JsonTableSource.Builder builder() {
+		return new Kafka08JsonTableSource.Builder();
+	}
+
+	/**
+	 * A builder to configure and create a {@link Kafka08JsonTableSource}.
+	 */
+	public static class Builder extends KafkaJsonTableSource.Builder<Kafka08JsonTableSource, Kafka08JsonTableSource.Builder> {
+
+		@Override
+		protected boolean supportsKafkaTimestamps() {
+			return false;
+		}
+
+		@Override
+		protected Kafka08JsonTableSource.Builder builder() {
+			return this;
+		}
+
+		/**
+		 * Builds and configures a {@link Kafka08JsonTableSource}.
+		 *
+		 * @return A configured {@link Kafka08JsonTableSource}.
+		 */
+		@Override
+		public Kafka08JsonTableSource build() {
+			Kafka08JsonTableSource tableSource = new Kafka08JsonTableSource(
+				getTopic(),
+				getKafkaProps(),
+				getTableSchema(),
+				getJsonSchema());
+			super.configureTableSource(tableSource);
+			return tableSource;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
index 9536306..b2b949b 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.types.Row;
 
@@ -28,7 +29,10 @@ import java.util.Properties;
 /**
  * Kafka {@link StreamTableSource} for Kafka 0.8.
  */
-public class Kafka08TableSource extends KafkaTableSource {
+public abstract class Kafka08TableSource extends KafkaTableSource {
+
+	// The deserialization schema for the Kafka records
+	private final DeserializationSchema<Row> deserializationSchema;
 
 	/**
 	 * Creates a Kafka 0.8 {@link StreamTableSource}.
@@ -43,9 +47,17 @@ public class Kafka08TableSource extends KafkaTableSource {
 			String topic,
 			Properties properties,
 			DeserializationSchema<Row> deserializationSchema,
+			TableSchema schema,
 			TypeInformation<Row> typeInfo) {
 
-		super(topic, properties, deserializationSchema, typeInfo);
+		super(topic, properties, schema, typeInfo);
+
+		this.deserializationSchema = deserializationSchema;
+	}
+
+	@Override
+	public DeserializationSchema<Row> getDeserializationSchema() {
+		return this.deserializationSchema;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java
index a704c2f..348f0f2 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java
@@ -18,24 +18,18 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.types.Row;
 
-import java.util.Properties;
-
 /**
  * Tests for the {@link Kafka08AvroTableSource}.
  */
-public class Kafka08AvroTableSourceTest extends KafkaTableSourceTestBase {
+public class Kafka08AvroTableSourceTest extends KafkaAvroTableSourceTestBase {
 
 	@Override
-	protected KafkaTableSource createTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo) {
-		return new Kafka08AvroTableSource(
-			topic,
-			properties,
-			AvroSpecificRecord.class);
+	protected KafkaTableSource.Builder getBuilder() {
+		return Kafka08AvroTableSource.builder();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
index adcd3a2..01a0123 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
@@ -18,21 +18,18 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
 import org.apache.flink.types.Row;
 
-import java.util.Properties;
-
 /**
  * Tests for the {@link Kafka08JsonTableSource}.
  */
-public class Kafka08JsonTableSourceTest extends KafkaTableSourceTestBase {
+public class Kafka08JsonTableSourceTest extends KafkaJsonTableSourceTestBase {
 
 	@Override
-	protected KafkaTableSource createTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo) {
-		return new Kafka08JsonTableSource(topic, properties, typeInfo);
+	protected KafkaTableSource.Builder getBuilder() {
+		return Kafka08JsonTableSource.builder();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
index d69187e..a90a8d8 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
@@ -19,12 +19,17 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.avro.specific.SpecificRecord;
 import org.apache.avro.specific.SpecificRecordBase;
 
+import java.util.Collections;
+import java.util.Map;
 import java.util.Properties;
 
 /**
@@ -37,22 +42,100 @@ public class Kafka09AvroTableSource extends KafkaAvroTableSource {
 	 *
 	 * @param topic      Kafka topic to consume.
 	 * @param properties Properties for the Kafka consumer.
+	 * @param schema	 Schema of the produced table.
 	 * @param record     Avro specific record.
 	 */
 	public Kafka09AvroTableSource(
 		String topic,
 		Properties properties,
+		TableSchema schema,
 		Class<? extends SpecificRecordBase> record) {
 
 		super(
 			topic,
 			properties,
+			schema,
 			record);
 	}
 
+	/**
+	 * Sets a mapping from schema fields to fields of the produced Avro record.
+	 *
+	 * <p>A field mapping is required if the fields of produced tables should be named different than
+	 * the fields of the Avro record.
+	 * The key of the provided Map refers to the field of the table schema,
+	 * the value to the field of the Avro record.</p>
+	 *
+	 * @param fieldMapping A mapping from schema fields to Avro fields.
+	 */
+	@Override
+	public void setFieldMapping(Map<String, String> fieldMapping) {
+		super.setFieldMapping(fieldMapping);
+	}
+
+	/**
+	 * Declares a field of the schema to be a processing time attribute.
+	 *
+	 * @param proctimeAttribute The name of the field that becomes the processing time field.
+	 */
+	@Override
+	public void setProctimeAttribute(String proctimeAttribute) {
+		super.setProctimeAttribute(proctimeAttribute);
+	}
+
+	/**
+	 * Declares a field of the schema to be a rowtime attribute.
+	 *
+	 * @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
+	 */
+	public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
+		Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
+		super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
+	}
+
 	@Override
 	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
 		return new FlinkKafkaConsumer09<>(topic, deserializationSchema, properties);
 	}
+
+	/**
+	 * Returns a builder to configure and create a {@link Kafka09AvroTableSource}.
+	 * @return A builder to configure and create a {@link Kafka09AvroTableSource}.
+	 */
+	public static Builder builder() {
+		return new Builder();
+	}
+
+	/**
+	 * A builder to configure and create a {@link Kafka09AvroTableSource}.
+	 */
+	public static class Builder extends KafkaAvroTableSource.Builder<Kafka09AvroTableSource, Kafka09AvroTableSource.Builder> {
+
+		@Override
+		protected boolean supportsKafkaTimestamps() {
+			return false;
+		}
+
+		@Override
+		protected Kafka09AvroTableSource.Builder builder() {
+			return this;
+		}
+
+		/**
+		 * Builds and configures a {@link Kafka09AvroTableSource}.
+		 *
+		 * @return A configured {@link Kafka09AvroTableSource}.
+		 */
+		@Override
+		public Kafka09AvroTableSource build() {
+			Kafka09AvroTableSource tableSource = new Kafka09AvroTableSource(
+				getTopic(),
+				getKafkaProps(),
+				getTableSchema(),
+				getAvroRecordClass());
+			super.configureTableSource(tableSource);
+			return tableSource;
+		}
+	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
index 80811b2..2f057d7 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
@@ -18,11 +18,15 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
 
+import java.util.Collections;
+import java.util.Map;
 import java.util.Properties;
 
 /**
@@ -33,21 +37,103 @@ public class Kafka09JsonTableSource extends KafkaJsonTableSource {
 	/**
 	 * Creates a Kafka 0.9 JSON {@link StreamTableSource}.
 	 *
-	 * @param topic      Kafka topic to consume.
-	 * @param properties Properties for the Kafka consumer.
-	 * @param typeInfo   Type information describing the result type. The field names are used
-	 *                   to parse the JSON file and so are the types.
+	 * @param topic       Kafka topic to consume.
+	 * @param properties  Properties for the Kafka consumer.
+	 * @param tableSchema The schema of the table.
+	 * @param jsonSchema  The schema of the JSON messages to decode from Kafka.
 	 */
 	public Kafka09JsonTableSource(
-			String topic,
-			Properties properties,
-			TypeInformation<Row> typeInfo) {
+		String topic,
+		Properties properties,
+		TableSchema tableSchema,
+		TableSchema jsonSchema) {
 
-		super(topic, properties, typeInfo);
+		super(topic, properties, tableSchema, jsonSchema);
+	}
+
+	/**
+	 * Sets the flag that specifies the behavior in case of missing fields.
+	 * TableSource will fail for missing fields if set to true. If set to false, the missing field is set to null.
+	 *
+	 * @param failOnMissingField Flag that specifies the TableSource behavior in case of missing fields.
+	 */
+	@Override
+	public void setFailOnMissingField(boolean failOnMissingField) {
+		super.setFailOnMissingField(failOnMissingField);
+	}
+
+	/**
+	 * Sets the mapping from table schema fields to JSON schema fields.
+	 *
+	 * @param fieldMapping The mapping from table schema fields to JSON schema fields.
+	 */
+	@Override
+	public void setFieldMapping(Map<String, String> fieldMapping) {
+		super.setFieldMapping(fieldMapping);
+	}
+
+	/**
+	 * Declares a field of the schema to be a processing time attribute.
+	 *
+	 * @param proctimeAttribute The name of the field that becomes the processing time field.
+	 */
+	@Override
+	public void setProctimeAttribute(String proctimeAttribute) {
+		super.setProctimeAttribute(proctimeAttribute);
+	}
+
+	/**
+	 * Declares a field of the schema to be a rowtime attribute.
+	 *
+	 * @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
+	 */
+	public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
+		Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
+		super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
 	}
 
 	@Override
 	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
 		return new FlinkKafkaConsumer09<>(topic, deserializationSchema, properties);
 	}
+
+	/**
+	 * Returns a builder to configure and create a {@link Kafka09JsonTableSource}.
+	 * @return A builder to configure and create a {@link Kafka09JsonTableSource}.
+	 */
+	public static Kafka09JsonTableSource.Builder builder() {
+		return new Kafka09JsonTableSource.Builder();
+	}
+
+	/**
+	 * A builder to configure and create a {@link Kafka09JsonTableSource}.
+	 */
+	public static class Builder extends KafkaJsonTableSource.Builder<Kafka09JsonTableSource, Kafka09JsonTableSource.Builder> {
+
+		@Override
+		protected boolean supportsKafkaTimestamps() {
+			return false;
+		}
+
+		@Override
+		protected Kafka09JsonTableSource.Builder builder() {
+			return this;
+		}
+
+		/**
+		 * Builds and configures a {@link Kafka09JsonTableSource}.
+		 *
+		 * @return A configured {@link Kafka09JsonTableSource}.
+		 */
+		@Override
+		public Kafka09JsonTableSource build() {
+			Kafka09JsonTableSource tableSource = new Kafka09JsonTableSource(
+				getTopic(),
+				getKafkaProps(),
+				getTableSchema(),
+				getJsonSchema());
+			super.configureTableSource(tableSource);
+			return tableSource;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
index bc50a4c..4d1166c 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.types.Row;
 
@@ -28,7 +29,10 @@ import java.util.Properties;
 /**
  * Kafka {@link StreamTableSource} for Kafka 0.9.
  */
-public class Kafka09TableSource extends KafkaTableSource {
+public abstract class Kafka09TableSource extends KafkaTableSource {
+
+	// The deserialization schema for the Kafka records
+	private final DeserializationSchema<Row> deserializationSchema;
 
 	/**
 	 * Creates a Kafka 0.9 {@link StreamTableSource}.
@@ -43,9 +47,17 @@ public class Kafka09TableSource extends KafkaTableSource {
 			String topic,
 			Properties properties,
 			DeserializationSchema<Row> deserializationSchema,
+			TableSchema schema,
 			TypeInformation<Row> typeInfo) {
 
-		super(topic, properties, deserializationSchema, typeInfo);
+		super(topic, properties, schema, typeInfo);
+
+		this.deserializationSchema = deserializationSchema;
+	}
+
+	@Override
+	public DeserializationSchema<Row> getDeserializationSchema() {
+		return this.deserializationSchema;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java
index 5e3c42c..27445e1 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java
@@ -18,25 +18,18 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.types.Row;
 
-import java.util.Properties;
-
 /**
  * Tests for the {@link Kafka09AvroTableSource}.
  */
-public class Kafka09AvroTableSourceTest extends KafkaTableSourceTestBase {
+public class Kafka09AvroTableSourceTest extends KafkaAvroTableSourceTestBase {
 
 	@Override
-	protected KafkaTableSource createTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo) {
-
-		return new Kafka09AvroTableSource(
-			topic,
-			properties,
-			AvroSpecificRecord.class);
+	protected KafkaTableSource.Builder getBuilder() {
+		return Kafka09AvroTableSource.builder();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
index ec70386..ed3fafb 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
@@ -18,21 +18,18 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
 import org.apache.flink.types.Row;
 
-import java.util.Properties;
-
 /**
  * Tests for the {@link Kafka09JsonTableSource}.
  */
-public class Kafka09JsonTableSourceTest extends KafkaTableSourceTestBase {
+public class Kafka09JsonTableSourceTest extends KafkaJsonTableSourceTestBase {
 
 	@Override
-	protected KafkaTableSource createTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo) {
-		return new Kafka09JsonTableSource(topic, properties, typeInfo);
+	protected KafkaTableSource.Builder getBuilder() {
+		return Kafka09JsonTableSource.builder();
 	}
 
 	@Override