You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/07/15 09:24:54 UTC

[09/13] flink git commit: [FLINK-8558] [table] Add unified format interfaces and separate formats from connectors

[FLINK-8558] [table] Add unified format interfaces and separate formats from connectors

This PR introduces a format discovery mechanism based on Java Service Providers. The general `TableFormatFactory` is similar to the existing table source discovery mechanism. However, it allows for arbirary format interfaces that might be introduced in the future. At the moment, a connector can request configured instances of `DeserializationSchema` and `SerializationSchema`. In the future we can add interfaces such as a `Writer` or `KeyedSerializationSchema` without breaking backwards compatibility.

This PR deprecates the existing strong coupling of connector and format for the Kafa table sources and table source factories. It introduces descriptor-based alternatives.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ee40335f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ee40335f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ee40335f

Branch: refs/heads/master
Commit: ee40335ffa40fb32a692fa6be70946d9a70301b2
Parents: 1632681
Author: Timo Walther <tw...@apache.org>
Authored: Wed Jun 27 13:16:49 2018 +0200
Committer: Timo Walther <tw...@apache.org>
Committed: Sun Jul 15 09:51:28 2018 +0200

----------------------------------------------------------------------
 docs/dev/table/sqlClient.md                     |   2 +
 .../flink-connector-kafka-0.10/pom.xml          |  53 +++
 .../kafka/Kafka010AvroTableSource.java          |  31 +-
 .../kafka/Kafka010AvroTableSourceFactory.java   |  37 --
 .../kafka/Kafka010JsonTableSource.java          |  32 +-
 .../kafka/Kafka010JsonTableSourceFactory.java   |  37 --
 .../connectors/kafka/Kafka010TableSource.java   |  71 +++-
 .../kafka/Kafka010TableSourceFactory.java       |  72 ++++
 ...pache.flink.table.sources.TableSourceFactory |   3 +-
 .../Kafka010AvroTableSourceFactoryTest.java     |  37 --
 .../kafka/Kafka010AvroTableSourceTest.java      |   4 +
 .../Kafka010JsonTableSourceFactoryTest.java     |   6 +-
 .../kafka/Kafka010JsonTableSourceTest.java      |   4 +
 .../kafka/Kafka010TableSourceFactoryTest.java   |  74 ++++
 .../kafka/Kafka011AvroTableSource.java          |  30 +-
 .../kafka/Kafka011AvroTableSourceFactory.java   |  37 --
 .../kafka/Kafka011JsonTableSource.java          |  33 +-
 .../kafka/Kafka011JsonTableSourceFactory.java   |  37 --
 .../connectors/kafka/Kafka011TableSource.java   |  70 +++-
 .../kafka/Kafka011TableSourceFactory.java       |  72 ++++
 ...pache.flink.table.sources.TableSourceFactory |   3 +-
 .../Kafka011AvroTableSourceFactoryTest.java     |  37 --
 .../kafka/Kafka011AvroTableSourceTest.java      |   4 +
 .../Kafka011JsonTableSourceFactoryTest.java     |   6 +-
 .../kafka/Kafka011JsonTableSourceTest.java      |   4 +
 .../kafka/Kafka011TableSourceFactoryTest.java   |  74 ++++
 .../kafka/Kafka08AvroTableSource.java           |  31 +-
 .../kafka/Kafka08AvroTableSourceFactory.java    |  37 --
 .../kafka/Kafka08JsonTableSource.java           |  32 +-
 .../kafka/Kafka08JsonTableSourceFactory.java    |  37 --
 .../connectors/kafka/Kafka08TableSource.java    |  69 +++-
 .../kafka/Kafka08TableSourceFactory.java        |  72 ++++
 ...pache.flink.table.sources.TableSourceFactory |   3 +-
 .../Kafka08AvroTableSourceFactoryTest.java      |  37 --
 .../kafka/Kafka08AvroTableSourceTest.java       |   4 +
 .../kafka/Kafka08JsonTableSourceTest.java       |   4 +
 .../kafka/Kafka08TableSourceFactoryTest.java    |  74 ++++
 .../flink-connector-kafka-0.9/pom.xml           |  52 +++
 .../kafka/Kafka09AvroTableSource.java           |  29 +-
 .../kafka/Kafka09AvroTableSourceFactory.java    |  36 --
 .../kafka/Kafka09JsonTableSource.java           |  32 +-
 .../kafka/Kafka09JsonTableSourceFactory.java    |  37 --
 .../connectors/kafka/Kafka09TableSource.java    |  69 +++-
 .../kafka/Kafka09TableSourceFactory.java        |  72 ++++
 ...pache.flink.table.sources.TableSourceFactory |   3 +-
 .../Kafka09AvroTableSourceFactoryTest.java      |  37 --
 .../kafka/Kafka09AvroTableSourceTest.java       |   4 +
 .../Kafka09JsonTableSourceFactoryTest.java      |   4 +
 .../kafka/Kafka09JsonTableSourceTest.java       |   4 +
 .../kafka/Kafka09TableSourceFactoryTest.java    |  74 ++++
 .../connectors/kafka/KafkaAvroTableSource.java  |  83 ++---
 .../kafka/KafkaAvroTableSourceFactory.java      |  81 -----
 .../connectors/kafka/KafkaJsonTableSource.java  |  99 ++---
 .../kafka/KafkaJsonTableSourceFactory.java      |  96 -----
 .../connectors/kafka/KafkaTableSource.java      | 266 +++++++++++---
 .../kafka/KafkaTableSourceFactory.java          | 195 +++++-----
 .../KafkaAvroTableSourceFactoryTestBase.java    | 123 -------
 .../kafka/KafkaAvroTableSourceTestBase.java     |   6 +-
 .../KafkaJsonTableSourceFactoryTestBase.java    |  18 +-
 .../kafka/KafkaJsonTableSourceTestBase.java     |   6 +-
 .../kafka/KafkaTableSourceBuilderTestBase.java  | 264 ++++++++++++++
 .../kafka/KafkaTableSourceFactoryTestBase.java  | 189 ++++++++++
 .../kafka/KafkaTableSourceTestBase.java         | 260 -------------
 .../avro/AvroRowDeserializationSchema.java      |  19 +
 .../formats/avro/AvroRowFormatFactory.java      |  99 +++++
 .../avro/AvroRowSerializationSchema.java        |  18 +
 ...pache.flink.table.formats.TableFormatFactory |  16 +
 .../formats/avro/AvroRowFormatFactoryTest.java  | 104 ++++++
 .../json/JsonRowDeserializationSchema.java      |  20 +-
 .../formats/json/JsonRowFormatFactory.java      | 108 ++++++
 .../formats/json/JsonRowSchemaConverter.java    | 362 +++++++++++++++++++
 .../json/JsonRowSerializationSchema.java        |  20 +-
 .../flink/formats/json/JsonSchemaConverter.java | 362 -------------------
 .../apache/flink/table/descriptors/Json.java    |   3 +-
 .../flink/table/descriptors/JsonValidator.java  |   2 +-
 ...pache.flink.table.formats.TableFormatFactory |  16 +
 .../formats/json/JsonRowFormatFactoryTest.java  | 150 ++++++++
 .../json/JsonRowSchemaConverterTest.java        | 114 ++++++
 .../json/JsonRowSerializationSchemaTest.java    |   2 +-
 .../formats/json/JsonSchemaConverterTest.java   | 114 ------
 .../flink/table/api/BatchTableEnvironment.scala |  49 ++-
 .../table/api/StreamTableEnvironment.scala      |  49 ++-
 .../flink/table/api/TableEnvironment.scala      |  38 ++
 .../apache/flink/table/api/TableSchema.scala    |  10 +
 .../org/apache/flink/table/api/exceptions.scala |  80 ++++
 .../descriptors/DescriptorProperties.scala      |  17 +-
 .../descriptors/FormatDescriptorValidator.scala |   5 +
 .../table/descriptors/RowtimeValidator.scala    |  43 ++-
 .../table/descriptors/SchemaValidator.scala     |  46 ++-
 .../formats/DeserializationSchemaFactory.scala  |  41 +++
 .../formats/SerializationSchemaFactory.scala    |  41 +++
 .../table/formats/TableFormatFactory.scala      |  85 +++++
 .../formats/TableFormatFactoryService.scala     | 271 ++++++++++++++
 .../table/sources/CsvTableSourceFactory.scala   |   5 +-
 .../table/sources/DefinedFieldMapping.scala     |   5 +-
 .../table/sources/TableSourceFactory.scala      |   9 +
 .../sources/TableSourceFactoryService.scala     |  35 +-
 .../table/sources/definedTimeAttributes.scala   |   2 +
 ...pache.flink.table.formats.TableFormatFactory |  17 +
 ...pache.flink.table.sources.TableSourceFactory |   3 +-
 .../flink/table/descriptors/RowtimeTest.scala   |   7 +-
 .../flink/table/descriptors/SchemaTest.scala    |   5 +-
 .../StreamTableSourceDescriptorTest.scala       |  79 ----
 .../descriptors/TableSourceDescriptorTest.scala | 111 ++++++
 .../formats/TableFormatFactoryServiceTest.scala | 144 ++++++++
 .../utils/TestAmbiguousTableFormatFactory.scala |  52 +++
 .../utils/TestDeserializationSchema.scala       |  50 +++
 .../formats/utils/TestSerializationSchema.scala |  30 ++
 .../table/formats/utils/TestTableFormat.scala   |  33 ++
 .../formats/utils/TestTableFormatFactory.scala  |  65 ++++
 .../sources/TableSourceFactoryServiceTest.scala |  22 +-
 .../sources/TestFixedFormatTableFactory.scala   |  62 ++++
 .../table/sources/TestTableSourceFactory.scala  |  63 ----
 .../TestWildcardFormatTableSourceFactory.scala  |  56 +++
 .../table/utils/MockTableEnvironment.scala      |   3 +
 115 files changed, 4515 insertions(+), 2056 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/docs/dev/table/sqlClient.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md
index 2ba8167..24af655 100644
--- a/docs/dev/table/sqlClient.md
+++ b/docs/dev/table/sqlClient.md
@@ -229,6 +229,8 @@ The SQL Client does not require to setup a Java project using Maven or SBT. Inst
 | Name              | Version       | Download               |
 | :---------------- | :------------ | :--------------------- |
 | Filesystem        |               | Built-in               |
+| Apache Kafka      | 0.9           | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.9{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.9{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) |
+| Apache Kafka      | 0.10          | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.10{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.10{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) |
 | Apache Kafka      | 0.11          | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.11{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.11{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) |
 
 #### Formats

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.10/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/pom.xml b/flink-connectors/flink-connector-kafka-0.10/pom.xml
index 22efc34..2fb7a32 100644
--- a/flink-connectors/flink-connector-kafka-0.10/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.10/pom.xml
@@ -202,6 +202,59 @@ under the License.
 
 	</dependencies>
 
+	<profiles>
+		<profile>
+			<!-- Create SQL Client uber jars for releases -->
+			<id>release</id>
+			<activation>
+				<property>
+					<name>release</name>
+				</property>
+			</activation>
+			<build>
+				<plugins>
+					<plugin>
+						<groupId>org.apache.maven.plugins</groupId>
+						<artifactId>maven-shade-plugin</artifactId>
+						<executions>
+							<execution>
+								<phase>package</phase>
+								<goals>
+									<goal>shade</goal>
+								</goals>
+								<configuration>
+									<shadedArtifactAttached>true</shadedArtifactAttached>
+									<shadedClassifierName>sql-jar</shadedClassifierName>
+									<artifactSet>
+										<includes combine.children="append">
+											<include>org.apache.kafka:*</include>
+											<include>org.apache.flink:flink-connector-kafka-base_${scala.binary.version}</include>
+											<include>org.apache.flink:flink-connector-kafka-0.9_${scala.binary.version}</include>
+										</includes>
+									</artifactSet>
+									<filters>
+										<filter>
+											<artifact>*:*</artifact>
+											<excludes>
+												<exclude>kafka/kafka-version.properties</exclude>
+											</excludes>
+										</filter>
+									</filters>
+									<relocations>
+										<relocation>
+											<pattern>org.apache.kafka</pattern>
+											<shadedPattern>org.apache.flink.kafka010.shaded.org.apache.kafka</shadedPattern>
+										</relocation>
+									</relocations>
+								</configuration>
+							</execution>
+						</executions>
+					</plugin>
+				</plugins>
+			</build>
+		</profile>
+	</profiles>
+
 	<build>
 		<plugins>
 			<plugin>

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
index f759f61..ebbadcf 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.types.Row;
@@ -35,8 +35,13 @@ import java.util.Properties;
 
 /**
  * Kafka {@link StreamTableSource} for Kafka 0.10.
+ *
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ *             with descriptors for schema and format instead. Descriptors allow for
+ *             implementation-agnostic definition of tables. See also
+ *             {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
  */
-@PublicEvolving
+@Deprecated
 public class Kafka010AvroTableSource extends KafkaAvroTableSource {
 
 	/**
@@ -46,7 +51,9 @@ public class Kafka010AvroTableSource extends KafkaAvroTableSource {
 	 * @param properties Properties for the Kafka consumer.
 	 * @param schema     Schema of the produced table.
 	 * @param record     Avro specific record.
+	 * @deprecated Use table descriptors instead of implementation-specific builders.
 	 */
+	@Deprecated
 	public Kafka010AvroTableSource(
 		String topic,
 		Properties properties,
@@ -69,7 +76,9 @@ public class Kafka010AvroTableSource extends KafkaAvroTableSource {
 	 * the value to the field of the Avro record.</p>
 	 *
 	 * @param fieldMapping A mapping from schema fields to Avro fields.
+	 * @deprecated Use table descriptors instead of implementation-specific builders.
 	 */
+	@Deprecated
 	@Override
 	public void setFieldMapping(Map<String, String> fieldMapping) {
 		super.setFieldMapping(fieldMapping);
@@ -79,7 +88,9 @@ public class Kafka010AvroTableSource extends KafkaAvroTableSource {
 	 * Declares a field of the schema to be a processing time attribute.
 	 *
 	 * @param proctimeAttribute The name of the field that becomes the processing time field.
+	 * @deprecated Use table descriptors instead of implementation-specific builders.
 	 */
+	@Deprecated
 	@Override
 	public void setProctimeAttribute(String proctimeAttribute) {
 		super.setProctimeAttribute(proctimeAttribute);
@@ -89,7 +100,9 @@ public class Kafka010AvroTableSource extends KafkaAvroTableSource {
 	 * Declares a field of the schema to be a rowtime attribute.
 	 *
 	 * @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
+	 * @deprecated Use table descriptors instead of implementation-specific builders.
 	 */
+	@Deprecated
 	public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
 		Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
 		super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
@@ -102,15 +115,27 @@ public class Kafka010AvroTableSource extends KafkaAvroTableSource {
 
 	/**
 	 * Returns a builder to configure and create a {@link Kafka010AvroTableSource}.
+	 *
 	 * @return A builder to configure and create a {@link Kafka010AvroTableSource}.
+	 * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+	 *             with descriptors for schema and format instead. Descriptors allow for
+	 *             implementation-agnostic definition of tables. See also
+	 *             {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
 	 */
+	@Deprecated
 	public static Builder builder() {
 		return new Builder();
 	}
 
 	/**
 	 * A builder to configure and create a {@link Kafka010AvroTableSource}.
+	 *
+	 * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+	 *             with descriptors for schema and format instead. Descriptors allow for
+	 *             implementation-agnostic definition of tables. See also
+	 *             {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
 	 */
+	@Deprecated
 	public static class Builder extends KafkaAvroTableSource.Builder<Kafka010AvroTableSource, Kafka010AvroTableSource.Builder> {
 
 		@Override
@@ -127,7 +152,9 @@ public class Kafka010AvroTableSource extends KafkaAvroTableSource {
 		 * Builds and configures a {@link Kafka010AvroTableSource}.
 		 *
 		 * @return A configured {@link Kafka010AvroTableSource}.
+		 * @deprecated Use table descriptors instead of implementation-specific builders.
 		 */
+		@Deprecated
 		@Override
 		public Kafka010AvroTableSource build() {
 			Kafka010AvroTableSource tableSource = new Kafka010AvroTableSource(

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceFactory.java
deleted file mode 100644
index 3972c3f..0000000
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_010;
-
-/**
- * Factory for creating configured instances of {@link Kafka010AvroTableSource}.
- */
-public class Kafka010AvroTableSourceFactory extends KafkaAvroTableSourceFactory {
-
-	@Override
-	protected KafkaAvroTableSource.Builder createKafkaAvroBuilder() {
-		return new Kafka010AvroTableSource.Builder();
-	}
-
-	@Override
-	protected String kafkaVersion() {
-		return CONNECTOR_VERSION_VALUE_010;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
index bda236f..a5e33a1 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.types.Row;
@@ -32,8 +32,13 @@ import java.util.Properties;
 
 /**
  * Kafka {@link StreamTableSource} for Kafka 0.10.
+ *
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ *             with descriptors for schema and format instead. Descriptors allow for
+ *             implementation-agnostic definition of tables. See also
+ *             {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
  */
-@PublicEvolving
+@Deprecated
 public class Kafka010JsonTableSource extends KafkaJsonTableSource {
 
 	/**
@@ -43,7 +48,9 @@ public class Kafka010JsonTableSource extends KafkaJsonTableSource {
 	 * @param properties  Properties for the Kafka consumer.
 	 * @param tableSchema The schema of the table.
 	 * @param jsonSchema  The schema of the JSON messages to decode from Kafka.
+	 * @deprecated Use table descriptors instead of implementation-specific builders.
 	 */
+	@Deprecated
 	public Kafka010JsonTableSource(
 		String topic,
 		Properties properties,
@@ -58,7 +65,9 @@ public class Kafka010JsonTableSource extends KafkaJsonTableSource {
 	 * TableSource will fail for missing fields if set to true. If set to false, the missing field is set to null.
 	 *
 	 * @param failOnMissingField Flag that specifies the TableSource behavior in case of missing fields.
+	 * @deprecated Use table descriptors instead of implementation-specific builders.
 	 */
+	@Deprecated
 	@Override
 	public void setFailOnMissingField(boolean failOnMissingField) {
 		super.setFailOnMissingField(failOnMissingField);
@@ -68,7 +77,9 @@ public class Kafka010JsonTableSource extends KafkaJsonTableSource {
 	 * Sets the mapping from table schema fields to JSON schema fields.
 	 *
 	 * @param fieldMapping The mapping from table schema fields to JSON schema fields.
+	 * @deprecated Use table descriptors instead of implementation-specific builders.
 	 */
+	@Deprecated
 	@Override
 	public void setFieldMapping(Map<String, String> fieldMapping) {
 		super.setFieldMapping(fieldMapping);
@@ -78,7 +89,9 @@ public class Kafka010JsonTableSource extends KafkaJsonTableSource {
 	 * Declares a field of the schema to be a processing time attribute.
 	 *
 	 * @param proctimeAttribute The name of the field that becomes the processing time field.
+	 * @deprecated Use table descriptors instead of implementation-specific builders.
 	 */
+	@Deprecated
 	@Override
 	public void setProctimeAttribute(String proctimeAttribute) {
 		super.setProctimeAttribute(proctimeAttribute);
@@ -88,7 +101,9 @@ public class Kafka010JsonTableSource extends KafkaJsonTableSource {
 	 * Declares a field of the schema to be a rowtime attribute.
 	 *
 	 * @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
+	 * @deprecated Use table descriptors instead of implementation-specific builders.
 	 */
+	@Deprecated
 	public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
 		Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
 		super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
@@ -101,15 +116,27 @@ public class Kafka010JsonTableSource extends KafkaJsonTableSource {
 
 	/**
 	 * Returns a builder to configure and create a {@link Kafka010JsonTableSource}.
+	 *
 	 * @return A builder to configure and create a {@link Kafka010JsonTableSource}.
+	 * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+	 *             with descriptors for schema and format instead. Descriptors allow for
+	 *             implementation-agnostic definition of tables. See also
+	 *             {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
 	 */
+	@Deprecated
 	public static Kafka010JsonTableSource.Builder builder() {
 		return new Kafka010JsonTableSource.Builder();
 	}
 
 	/**
 	 * A builder to configure and create a {@link Kafka010JsonTableSource}.
+	 *
+	 * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+	 *             with descriptors for schema and format instead. Descriptors allow for
+	 *             implementation-agnostic definition of tables. See also
+	 *             {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
 	 */
+	@Deprecated
 	public static class Builder extends KafkaJsonTableSource.Builder<Kafka010JsonTableSource, Kafka010JsonTableSource.Builder> {
 
 		@Override
@@ -126,6 +153,7 @@ public class Kafka010JsonTableSource extends KafkaJsonTableSource {
 		 * Builds and configures a {@link Kafka010JsonTableSource}.
 		 *
 		 * @return A configured {@link Kafka010JsonTableSource}.
+		 * @deprecated Use table descriptors instead of implementation-specific builders.
 		 */
 		@Override
 		public Kafka010JsonTableSource build() {

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java
deleted file mode 100644
index 6c128b0..0000000
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_010;
-
-/**
- * Factory for creating configured instances of {@link Kafka010JsonTableSource}.
- */
-public class Kafka010JsonTableSourceFactory extends KafkaJsonTableSourceFactory {
-
-	@Override
-	protected KafkaJsonTableSource.Builder createKafkaJsonBuilder() {
-		return new Kafka010JsonTableSource.Builder();
-	}
-
-	@Override
-	protected String kafkaVersion() {
-		return CONNECTOR_VERSION_VALUE_010;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
index 5a02227..f657462 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
@@ -18,48 +18,79 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.types.Row;
 
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 
 /**
  * Kafka {@link StreamTableSource} for Kafka 0.10.
  */
-@PublicEvolving
-public abstract class Kafka010TableSource extends KafkaTableSource {
-
-	// The deserialization schema for the Kafka records
-	private final DeserializationSchema<Row> deserializationSchema;
+@Internal
+public class Kafka010TableSource extends KafkaTableSource {
 
 	/**
 	 * Creates a Kafka 0.10 {@link StreamTableSource}.
 	 *
-	 * @param topic                 Kafka topic to consume.
-	 * @param properties            Properties for the Kafka consumer.
-	 * @param deserializationSchema Deserialization schema to use for Kafka records.
-	 * @param typeInfo              Type information describing the result type. The field names are used
-	 *                              to parse the JSON file and so are the types.
+	 * @param schema                      Schema of the produced table.
+	 * @param proctimeAttribute           Field name of the processing time attribute.
+	 * @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute
+	 * @param fieldMapping                Mapping for the fields of the table schema to
+	 *                                    fields of the physical returned type.
+	 * @param topic                       Kafka topic to consume.
+	 * @param properties                  Properties for the Kafka consumer.
+	 * @param deserializationSchema       Deserialization schema for decoding records from Kafka.
+	 * @param startupMode                 Startup mode for the contained consumer.
+	 * @param specificStartupOffsets      Specific startup offsets; only relevant when startup
+	 *                                    mode is {@link StartupMode#SPECIFIC_OFFSETS}.
 	 */
 	public Kafka010TableSource(
+			TableSchema schema,
+			Optional<String> proctimeAttribute,
+			List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+			Optional<Map<String, String>> fieldMapping,
 			String topic,
 			Properties properties,
 			DeserializationSchema<Row> deserializationSchema,
-			TableSchema schema,
-			TypeInformation<Row> typeInfo) {
+			StartupMode startupMode,
+			Map<KafkaTopicPartition, Long> specificStartupOffsets) {
 
-		super(topic, properties, schema, typeInfo);
-
-		this.deserializationSchema = deserializationSchema;
+		super(
+			schema,
+			proctimeAttribute,
+			rowtimeAttributeDescriptors,
+			fieldMapping,
+			topic,
+			properties,
+			deserializationSchema,
+			startupMode,
+			specificStartupOffsets);
 	}
 
-	@Override
-	public DeserializationSchema<Row> getDeserializationSchema() {
-		return this.deserializationSchema;
+	/**
+	 * Creates a Kafka 0.10 {@link StreamTableSource}.
+	 *
+	 * @param schema                Schema of the produced table.
+	 * @param topic                 Kafka topic to consume.
+	 * @param properties            Properties for the Kafka consumer.
+	 * @param deserializationSchema Deserialization schema for decoding records from Kafka.
+	 */
+	public Kafka010TableSource(
+			TableSchema schema,
+			String topic,
+			Properties properties,
+			DeserializationSchema<Row> deserializationSchema) {
+
+		super(schema, topic, properties, deserializationSchema);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactory.java
new file mode 100644
index 0000000..4a86016
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Factory for creating configured instances of {@link Kafka010TableSource}.
+ */
+public class Kafka010TableSourceFactory extends KafkaTableSourceFactory {
+
+	@Override
+	protected String kafkaVersion() {
+		return KafkaValidator.CONNECTOR_VERSION_VALUE_010;
+	}
+
+	@Override
+	protected boolean supportsKafkaTimestamps() {
+		return true;
+	}
+
+	@Override
+	protected KafkaTableSource createKafkaTableSource(
+			TableSchema schema,
+			Optional<String> proctimeAttribute,
+			List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+			Map<String, String> fieldMapping,
+			String topic,
+			Properties properties,
+			DeserializationSchema<Row> deserializationSchema,
+			StartupMode startupMode,
+			Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+		return new Kafka010TableSource(
+			schema,
+			proctimeAttribute,
+			rowtimeAttributeDescriptors,
+			Optional.of(fieldMapping),
+			topic,
+			properties,
+			deserializationSchema,
+			startupMode,
+			specificStartupOffsets);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
index cf10939..21f5707 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
@@ -13,5 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.streaming.connectors.kafka.Kafka010JsonTableSourceFactory
-org.apache.flink.streaming.connectors.kafka.Kafka010AvroTableSourceFactory
+org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceFactoryTest.java
deleted file mode 100644
index d78bfbd..0000000
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceFactoryTest.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_010;
-
-/**
- * Tests for {@link Kafka010AvroTableSourceFactory}.
- */
-public class Kafka010AvroTableSourceFactoryTest extends KafkaAvroTableSourceFactoryTestBase {
-
-	@Override
-	protected String version() {
-		return CONNECTOR_VERSION_VALUE_010;
-	}
-
-	@Override
-	protected KafkaAvroTableSource.Builder builder() {
-		return Kafka010AvroTableSource.builder();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
index f5f8af1..bf253c4 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
@@ -24,7 +24,11 @@ import org.apache.flink.types.Row;
 
 /**
  * Tests for the {@link Kafka010AvroTableSource}.
+ *
+ * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
+ *             drop support for format-specific table sources.
  */
+@Deprecated
 public class Kafka010AvroTableSourceTest extends KafkaAvroTableSourceTestBase {
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactoryTest.java
index 22cf659..ab83a01 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactoryTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactoryTest.java
@@ -21,8 +21,12 @@ package org.apache.flink.streaming.connectors.kafka;
 import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_010;
 
 /**
- * Tests for {@link Kafka010JsonTableSourceFactory}.
+ * Tests for legacy Kafka010JsonTableSourceFactory.
+ *
+ * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
+ *             drop support for format-specific table sources.
  */
+@Deprecated
 public class Kafka010JsonTableSourceFactoryTest extends KafkaJsonTableSourceFactoryTestBase {
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
index f9d4d84..087f3ed 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
@@ -24,7 +24,11 @@ import org.apache.flink.types.Row;
 
 /**
  * Tests for the {@link Kafka010JsonTableSource}.
+ *
+ * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
+ *             drop support for format-specific table sources.
  */
+@Deprecated
 public class Kafka010JsonTableSourceTest extends KafkaJsonTableSourceTestBase {
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java
new file mode 100644
index 0000000..ff3b0b0
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Test for {@link Kafka010TableSource} created by {@link Kafka010TableSourceFactory}.
+ */
+public class Kafka010TableSourceFactoryTest extends KafkaTableSourceFactoryTestBase {
+
+	@Override
+	protected String getKafkaVersion() {
+		return KafkaValidator.CONNECTOR_VERSION_VALUE_010;
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	protected Class<FlinkKafkaConsumerBase<Row>> getExpectedFlinkKafkaConsumer() {
+		return (Class) FlinkKafkaConsumer010.class;
+	}
+
+	@Override
+	protected KafkaTableSource getExpectedKafkaTableSource(
+			TableSchema schema,
+			Optional<String> proctimeAttribute,
+			List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+			Map<String, String> fieldMapping,
+			String topic,
+			Properties properties,
+			DeserializationSchema<Row> deserializationSchema,
+			StartupMode startupMode,
+			Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+		return new Kafka010TableSource(
+			schema,
+			proctimeAttribute,
+			rowtimeAttributeDescriptors,
+			Optional.of(fieldMapping),
+			topic,
+			properties,
+			deserializationSchema,
+			startupMode,
+			specificStartupOffsets
+		);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
index f4484f6..b3f4e0a 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.types.Row;
@@ -35,8 +35,13 @@ import java.util.Properties;
 
 /**
  * Kafka {@link StreamTableSource} for Kafka 0.11.
+ *
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ *             with descriptors for schema and format instead. Descriptors allow for
+ *             implementation-agnostic definition of tables. See also
+ *             {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
  */
-@PublicEvolving
+@Deprecated
 public class Kafka011AvroTableSource extends KafkaAvroTableSource {
 
 	/**
@@ -46,7 +51,9 @@ public class Kafka011AvroTableSource extends KafkaAvroTableSource {
 	 * @param properties Properties for the Kafka consumer.
 	 * @param schema     Schema of the produced table.
 	 * @param record     Avro specific record.
+	 * @deprecated Use table descriptors instead of implementation-specific builders.
 	 */
+	@Deprecated
 	public Kafka011AvroTableSource(
 		String topic,
 		Properties properties,
@@ -69,7 +76,9 @@ public class Kafka011AvroTableSource extends KafkaAvroTableSource {
 	 * the value to the field of the Avro record.</p>
 	 *
 	 * @param fieldMapping A mapping from schema fields to Avro fields.
+	 * @deprecated Use table descriptors instead of implementation-specific builders.
 	 */
+	@Deprecated
 	@Override
 	public void setFieldMapping(Map<String, String> fieldMapping) {
 		super.setFieldMapping(fieldMapping);
@@ -79,7 +88,9 @@ public class Kafka011AvroTableSource extends KafkaAvroTableSource {
 	 * Declares a field of the schema to be a processing time attribute.
 	 *
 	 * @param proctimeAttribute The name of the field that becomes the processing time field.
+	 * @deprecated Use table descriptors instead of implementation-specific builders.
 	 */
+	@Deprecated
 	@Override
 	public void setProctimeAttribute(String proctimeAttribute) {
 		super.setProctimeAttribute(proctimeAttribute);
@@ -89,7 +100,9 @@ public class Kafka011AvroTableSource extends KafkaAvroTableSource {
 	 * Declares a field of the schema to be a rowtime attribute.
 	 *
 	 * @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
+	 * @deprecated Use table descriptors instead of implementation-specific builders.
 	 */
+	@Deprecated
 	public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
 		Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
 		super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
@@ -103,14 +116,25 @@ public class Kafka011AvroTableSource extends KafkaAvroTableSource {
 	/**
 	 * Returns a builder to configure and create a {@link Kafka011AvroTableSource}.
 	 * @return A builder to configure and create a {@link Kafka011AvroTableSource}.
+	 * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+	 *             with descriptors for schema and format instead. Descriptors allow for
+	 *             implementation-agnostic definition of tables. See also
+	 *             {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
 	 */
+	@Deprecated
 	public static Builder builder() {
 		return new Builder();
 	}
 
 	/**
 	 * A builder to configure and create a {@link Kafka011AvroTableSource}.
+	 *
+	 * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+	 *             with descriptors for schema and format instead. Descriptors allow for
+	 *             implementation-agnostic definition of tables. See also
+	 *             {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
 	 */
+	@Deprecated
 	public static class Builder extends KafkaAvroTableSource.Builder<Kafka011AvroTableSource, Kafka011AvroTableSource.Builder> {
 
 		@Override
@@ -127,7 +151,9 @@ public class Kafka011AvroTableSource extends KafkaAvroTableSource {
 		 * Builds and configures a {@link Kafka011AvroTableSource}.
 		 *
 		 * @return A configured {@link Kafka011AvroTableSource}.
+		 * @deprecated Use table descriptors instead of implementation-specific builders.
 		 */
+		@Deprecated
 		@Override
 		public Kafka011AvroTableSource build() {
 			Kafka011AvroTableSource tableSource = new Kafka011AvroTableSource(

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceFactory.java
deleted file mode 100644
index a959983..0000000
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_011;
-
-/**
- * Factory for creating configured instances of {@link Kafka011AvroTableSource}.
- */
-public class Kafka011AvroTableSourceFactory extends KafkaAvroTableSourceFactory {
-
-	@Override
-	protected KafkaAvroTableSource.Builder createKafkaAvroBuilder() {
-		return new Kafka011AvroTableSource.Builder();
-	}
-
-	@Override
-	protected String kafkaVersion() {
-		return CONNECTOR_VERSION_VALUE_011;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
index a012f5d..74c5007 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.types.Row;
@@ -32,8 +32,13 @@ import java.util.Properties;
 
 /**
  * Kafka {@link StreamTableSource} for Kafka 0.11.
+ *
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ *             with descriptors for schema and format instead. Descriptors allow for
+ *             implementation-agnostic definition of tables. See also
+ *             {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
  */
-@PublicEvolving
+@Deprecated
 public class Kafka011JsonTableSource extends KafkaJsonTableSource {
 
 	/**
@@ -43,7 +48,9 @@ public class Kafka011JsonTableSource extends KafkaJsonTableSource {
 	 * @param properties  Properties for the Kafka consumer.
 	 * @param tableSchema The schema of the table.
 	 * @param jsonSchema  The schema of the JSON messages to decode from Kafka.
+	 * @deprecated Use table descriptors instead of implementation-specific builders.
 	 */
+	@Deprecated
 	public Kafka011JsonTableSource(
 		String topic,
 		Properties properties,
@@ -58,7 +65,9 @@ public class Kafka011JsonTableSource extends KafkaJsonTableSource {
 	 * TableSource will fail for missing fields if set to true. If set to false, the missing field is set to null.
 	 *
 	 * @param failOnMissingField Flag that specifies the TableSource behavior in case of missing fields.
+	 * @deprecated Use table descriptors instead of implementation-specific builders.
 	 */
+	@Deprecated
 	@Override
 	public void setFailOnMissingField(boolean failOnMissingField) {
 		super.setFailOnMissingField(failOnMissingField);
@@ -68,7 +77,9 @@ public class Kafka011JsonTableSource extends KafkaJsonTableSource {
 	 * Sets the mapping from table schema fields to JSON schema fields.
 	 *
 	 * @param fieldMapping The mapping from table schema fields to JSON schema fields.
+	 * @deprecated Use table descriptors instead of implementation-specific builders.
 	 */
+	@Deprecated
 	@Override
 	public void setFieldMapping(Map<String, String> fieldMapping) {
 		super.setFieldMapping(fieldMapping);
@@ -78,7 +89,9 @@ public class Kafka011JsonTableSource extends KafkaJsonTableSource {
 	 * Declares a field of the schema to be a processing time attribute.
 	 *
 	 * @param proctimeAttribute The name of the field that becomes the processing time field.
+	 * @deprecated Use table descriptors instead of implementation-specific builders.
 	 */
+	@Deprecated
 	@Override
 	public void setProctimeAttribute(String proctimeAttribute) {
 		super.setProctimeAttribute(proctimeAttribute);
@@ -88,7 +101,9 @@ public class Kafka011JsonTableSource extends KafkaJsonTableSource {
 	 * Declares a field of the schema to be a rowtime attribute.
 	 *
 	 * @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
+	 * @deprecated Use table descriptors instead of implementation-specific builders.
 	 */
+	@Deprecated
 	public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
 		Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
 		super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
@@ -101,15 +116,27 @@ public class Kafka011JsonTableSource extends KafkaJsonTableSource {
 
 	/**
 	 * Returns a builder to configure and create a {@link Kafka011JsonTableSource}.
+	 *
 	 * @return A builder to configure and create a {@link Kafka011JsonTableSource}.
+	 * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+	 *             with descriptors for schema and format instead. Descriptors allow for
+	 *             implementation-agnostic definition of tables. See also
+	 *             {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
 	 */
+	@Deprecated
 	public static Kafka011JsonTableSource.Builder builder() {
 		return new Kafka011JsonTableSource.Builder();
 	}
 
 	/**
 	 * A builder to configure and create a {@link Kafka011JsonTableSource}.
+	 *
+	 * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+	 *             with descriptors for schema and format instead. Descriptors allow for
+	 *             implementation-agnostic definition of tables. See also
+	 *             {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
 	 */
+	@Deprecated
 	public static class Builder extends KafkaJsonTableSource.Builder<Kafka011JsonTableSource, Kafka011JsonTableSource.Builder> {
 
 		@Override
@@ -126,7 +153,9 @@ public class Kafka011JsonTableSource extends KafkaJsonTableSource {
 		 * Builds and configures a {@link Kafka011JsonTableSource}.
 		 *
 		 * @return A configured {@link Kafka011JsonTableSource}.
+		 * @deprecated Use table descriptors instead of implementation-specific builders.
 		 */
+		@Deprecated
 		@Override
 		public Kafka011JsonTableSource build() {
 			Kafka011JsonTableSource tableSource = new Kafka011JsonTableSource(

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java
deleted file mode 100644
index 53bf7be..0000000
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_011;
-
-/**
- * Factory for creating configured instances of {@link Kafka011JsonTableSource}.
- */
-public class Kafka011JsonTableSourceFactory extends KafkaJsonTableSourceFactory {
-
-	@Override
-	protected KafkaJsonTableSource.Builder createKafkaJsonBuilder() {
-		return new Kafka011JsonTableSource.Builder();
-	}
-
-	@Override
-	protected String kafkaVersion() {
-		return CONNECTOR_VERSION_VALUE_011;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
index 6c9c37d..c1d8820 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
@@ -18,53 +18,83 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.types.Row;
 
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 
 /**
  * Kafka {@link StreamTableSource} for Kafka 0.11.
  */
-@PublicEvolving
-public abstract class Kafka011TableSource extends KafkaTableSource {
+@Internal
+public class Kafka011TableSource extends KafkaTableSource {
 
-	// The deserialization schema for the Kafka records
-	private final DeserializationSchema<Row> deserializationSchema;
+	/**
+	 * Creates a Kafka 0.11 {@link StreamTableSource}.
+	 *
+	 * @param schema                      Schema of the produced table.
+	 * @param proctimeAttribute           Field name of the processing time attribute.
+	 * @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute
+	 * @param fieldMapping                Mapping for the fields of the table schema to
+	 *                                    fields of the physical returned type.
+	 * @param topic                       Kafka topic to consume.
+	 * @param properties                  Properties for the Kafka consumer.
+	 * @param deserializationSchema       Deserialization schema for decoding records from Kafka.
+	 * @param startupMode                 Startup mode for the contained consumer.
+	 * @param specificStartupOffsets      Specific startup offsets; only relevant when startup
+	 *                                    mode is {@link StartupMode#SPECIFIC_OFFSETS}.
+	 */
+	public Kafka011TableSource(
+			TableSchema schema,
+			Optional<String> proctimeAttribute,
+			List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+			Optional<Map<String, String>> fieldMapping,
+			String topic, Properties properties,
+			DeserializationSchema<Row> deserializationSchema,
+			StartupMode startupMode,
+			Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+		super(
+			schema,
+			proctimeAttribute,
+			rowtimeAttributeDescriptors,
+			fieldMapping,
+			topic,
+			properties,
+			deserializationSchema,
+			startupMode,
+			specificStartupOffsets);
+	}
 
 	/**
 	 * Creates a Kafka 0.11 {@link StreamTableSource}.
 	 *
+	 * @param schema                Schema of the produced table.
 	 * @param topic                 Kafka topic to consume.
 	 * @param properties            Properties for the Kafka consumer.
-	 * @param deserializationSchema Deserialization schema to use for Kafka records.
-	 * @param typeInfo              Type information describing the result type. The field names are used
-	 *                              to parse the JSON file and so are the types.
+	 * @param deserializationSchema Deserialization schema for decoding records from Kafka.
 	 */
 	public Kafka011TableSource(
+			TableSchema schema,
 			String topic,
 			Properties properties,
-			DeserializationSchema<Row> deserializationSchema,
-			TableSchema schema,
-			TypeInformation<Row> typeInfo) {
-
-		super(topic, properties, schema, typeInfo);
+			DeserializationSchema<Row> deserializationSchema) {
 
-		this.deserializationSchema = deserializationSchema;
-	}
-
-	@Override
-	public DeserializationSchema<Row> getDeserializationSchema() {
-		return this.deserializationSchema;
+		super(schema, topic, properties, deserializationSchema);
 	}
 
 	@Override
 	protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
 		return new FlinkKafkaConsumer011<>(topic, deserializationSchema, properties);
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactory.java
new file mode 100644
index 0000000..b1e3929
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Factory for creating configured instances of {@link Kafka011TableSource}.
+ */
+public class Kafka011TableSourceFactory extends KafkaTableSourceFactory {
+
+	@Override
+	protected String kafkaVersion() {
+		return KafkaValidator.CONNECTOR_VERSION_VALUE_011;
+	}
+
+	@Override
+	protected boolean supportsKafkaTimestamps() {
+		return true;
+	}
+
+	@Override
+	protected KafkaTableSource createKafkaTableSource(
+			TableSchema schema,
+			Optional<String> proctimeAttribute,
+			List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+			Map<String, String> fieldMapping,
+			String topic,
+			Properties properties,
+			DeserializationSchema<Row> deserializationSchema,
+			StartupMode startupMode,
+			Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+		return new Kafka011TableSource(
+			schema,
+			proctimeAttribute,
+			rowtimeAttributeDescriptors,
+			Optional.of(fieldMapping),
+			topic,
+			properties,
+			deserializationSchema,
+			startupMode,
+			specificStartupOffsets);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
index f6825ad..c056097 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
@@ -13,5 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.streaming.connectors.kafka.Kafka011JsonTableSourceFactory
-org.apache.flink.streaming.connectors.kafka.Kafka011AvroTableSourceFactory
+org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceFactoryTest.java
deleted file mode 100644
index 787a525..0000000
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceFactoryTest.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_011;
-
-/**
- * Tests for {@link Kafka011AvroTableSourceFactory}.
- */
-public class Kafka011AvroTableSourceFactoryTest extends KafkaAvroTableSourceFactoryTestBase {
-
-	@Override
-	protected String version() {
-		return CONNECTOR_VERSION_VALUE_011;
-	}
-
-	@Override
-	protected KafkaAvroTableSource.Builder builder() {
-		return Kafka011AvroTableSource.builder();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java
index e348aa6..aa083a2 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java
@@ -24,7 +24,11 @@ import org.apache.flink.types.Row;
 
 /**
  * Tests for the {@link Kafka011AvroTableSource}.
+ *
+ * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
+ *             drop support for format-specific table sources.
  */
+@Deprecated
 public class Kafka011AvroTableSourceTest extends KafkaAvroTableSourceTestBase {
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactoryTest.java
index ed92863..ae46dac 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactoryTest.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactoryTest.java
@@ -21,8 +21,12 @@ package org.apache.flink.streaming.connectors.kafka;
 import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_011;
 
 /**
- * Tests for {@link Kafka011JsonTableSourceFactory}.
+ * Tests for legacy Kafka011JsonTableSourceFactory.
+ *
+ * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
+ *             drop support for format-specific table sources.
  */
+@Deprecated
 public class Kafka011JsonTableSourceFactoryTest extends KafkaJsonTableSourceFactoryTestBase {
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java
index b35d1c5..451795a 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java
@@ -24,7 +24,11 @@ import org.apache.flink.types.Row;
 
 /**
  * Tests for the {@link Kafka011JsonTableSource}.
+ *
+ * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
+ *             drop support for format-specific table sources.
  */
+@Deprecated
 public class Kafka011JsonTableSourceTest extends KafkaJsonTableSourceTestBase {
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java
new file mode 100644
index 0000000..abaa490
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Test for {@link Kafka011TableSource} created by {@link Kafka011TableSourceFactory}.
+ */
+public class Kafka011TableSourceFactoryTest extends KafkaTableSourceFactoryTestBase {
+
+	@Override
+	protected String getKafkaVersion() {
+		return KafkaValidator.CONNECTOR_VERSION_VALUE_011;
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	protected Class<FlinkKafkaConsumerBase<Row>> getExpectedFlinkKafkaConsumer() {
+		return (Class) FlinkKafkaConsumer011.class;
+	}
+
+	@Override
+	protected KafkaTableSource getExpectedKafkaTableSource(
+			TableSchema schema,
+			Optional<String> proctimeAttribute,
+			List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+			Map<String, String> fieldMapping,
+			String topic,
+			Properties properties,
+			DeserializationSchema<Row> deserializationSchema,
+			StartupMode startupMode,
+			Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+		return new Kafka011TableSource(
+			schema,
+			proctimeAttribute,
+			rowtimeAttributeDescriptors,
+			Optional.of(fieldMapping),
+			topic,
+			properties,
+			deserializationSchema,
+			startupMode,
+			specificStartupOffsets
+		);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
index 3c16722..8206287 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.types.Row;
@@ -35,8 +35,13 @@ import java.util.Properties;
 
 /**
  * Kafka {@link StreamTableSource} for Kafka 0.8.
+ *
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ *             with descriptors for schema and format instead. Descriptors allow for
+ *             implementation-agnostic definition of tables. See also
+ *             {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
  */
-@PublicEvolving
+@Deprecated
 public class Kafka08AvroTableSource extends KafkaAvroTableSource {
 
 	/**
@@ -46,7 +51,9 @@ public class Kafka08AvroTableSource extends KafkaAvroTableSource {
 	 * @param properties Properties for the Kafka consumer.
 	 * @param schema     Schema of the produced table.
 	 * @param record     Avro specific record.
+	 * @deprecated Use table descriptors instead of implementation-specific builders.
 	 */
+	@Deprecated
 	public Kafka08AvroTableSource(
 		String topic,
 		Properties properties,
@@ -69,7 +76,9 @@ public class Kafka08AvroTableSource extends KafkaAvroTableSource {
 	 * the value to the field of the Avro record.</p>
 	 *
 	 * @param fieldMapping A mapping from schema fields to Avro fields.
+	 * @deprecated Use table descriptors instead of implementation-specific builders.
 	 */
+	@Deprecated
 	@Override
 	public void setFieldMapping(Map<String, String> fieldMapping) {
 		super.setFieldMapping(fieldMapping);
@@ -79,7 +88,9 @@ public class Kafka08AvroTableSource extends KafkaAvroTableSource {
 	 * Declares a field of the schema to be a processing time attribute.
 	 *
 	 * @param proctimeAttribute The name of the field that becomes the processing time field.
+	 * @deprecated Use table descriptors instead of implementation-specific builders.
 	 */
+	@Deprecated
 	@Override
 	public void setProctimeAttribute(String proctimeAttribute) {
 		super.setProctimeAttribute(proctimeAttribute);
@@ -89,7 +100,9 @@ public class Kafka08AvroTableSource extends KafkaAvroTableSource {
 	 * Declares a field of the schema to be a rowtime attribute.
 	 *
 	 * @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
+	 * @deprecated Use table descriptors instead of implementation-specific builders.
 	 */
+	@Deprecated
 	public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
 		Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
 		super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
@@ -102,15 +115,27 @@ public class Kafka08AvroTableSource extends KafkaAvroTableSource {
 
 	/**
 	 * Returns a builder to configure and create a {@link Kafka08AvroTableSource}.
+	 *
 	 * @return A builder to configure and create a {@link Kafka08AvroTableSource}.
+	 * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+	 *             with descriptors for schema and format instead. Descriptors allow for
+	 *             implementation-agnostic definition of tables. See also
+	 *             {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
 	 */
+	@Deprecated
 	public static Builder builder() {
 		return new Builder();
 	}
 
 	/**
 	 * A builder to configure and create a {@link Kafka08AvroTableSource}.
+	 *
+	 * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+	 *             with descriptors for schema and format instead. Descriptors allow for
+	 *             implementation-agnostic definition of tables. See also
+	 *             {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
 	 */
+	@Deprecated
 	public static class Builder extends KafkaAvroTableSource.Builder<Kafka08AvroTableSource, Kafka08AvroTableSource.Builder> {
 
 		@Override
@@ -127,7 +152,9 @@ public class Kafka08AvroTableSource extends KafkaAvroTableSource {
 		 * Builds and configures a {@link Kafka08AvroTableSource}.
 		 *
 		 * @return A configured {@link Kafka08AvroTableSource}.
+		 * @deprecated Use table descriptors instead of implementation-specific builders.
 		 */
+		@Deprecated
 		@Override
 		public Kafka08AvroTableSource build() {
 			Kafka08AvroTableSource tableSource = new Kafka08AvroTableSource(

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceFactory.java
deleted file mode 100644
index aefc4db..0000000
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_08;
-
-/**
- * Factory for creating configured instances of {@link Kafka08AvroTableSource}.
- */
-public class Kafka08AvroTableSourceFactory extends KafkaAvroTableSourceFactory {
-
-	@Override
-	protected KafkaAvroTableSource.Builder createKafkaAvroBuilder() {
-		return new Kafka08AvroTableSource.Builder();
-	}
-
-	@Override
-	protected String kafkaVersion() {
-		return CONNECTOR_VERSION_VALUE_08;
-	}
-}