You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/11/30 13:58:22 UTC

[GitHub] asfgit closed pull request #7182: [FLINK-11015] Remove deprecated methods and classes about table from all the Kafka connectors

asfgit closed pull request #7182: [FLINK-11015] Remove deprecated methods and classes about table from all the Kafka connectors
URL: https://github.com/apache/flink/pull/7182
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-connectors/flink-connector-kafka-0.10/pom.xml b/flink-connectors/flink-connector-kafka-0.10/pom.xml
index 53a72679ae2..75ced033e90 100644
--- a/flink-connectors/flink-connector-kafka-0.10/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.10/pom.xml
@@ -86,40 +86,8 @@ under the License.
 			<optional>true</optional>
 		</dependency>
 
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-avro</artifactId>
-			<version>${project.version}</version>
-			<!-- Projects depending on this project, won't depend on flink-avro. -->
-			<optional>true</optional>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-json</artifactId>
-			<version>${project.version}</version>
-			<!-- Projects depending on this project, won't depend on flink-json. -->
-			<optional>true</optional>
-		</dependency>
-
 		<!-- test dependencies -->
 
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-avro</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-			<type>test-jar</type>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-json</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
-
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
deleted file mode 100644
index d9d0a91ced1..00000000000
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.ConnectorDescriptor;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.table.sources.StreamTableSource;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * Kafka {@link StreamTableSource} for Kafka 0.10.
- *
- * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
- *             with descriptors for schema and format instead. Descriptors allow for
- *             implementation-agnostic definition of tables. See also
- *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
- */
-@Deprecated
-public class Kafka010AvroTableSource extends KafkaAvroTableSource {
-
-	/**
-	 * Creates a Kafka 0.10 Avro {@link StreamTableSource} using a given {@link SpecificRecord}.
-	 *
-	 * @param topic      Kafka topic to consume.
-	 * @param properties Properties for the Kafka consumer.
-	 * @param schema     Schema of the produced table.
-	 * @param record     Avro specific record.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	public Kafka010AvroTableSource(
-		String topic,
-		Properties properties,
-		TableSchema schema,
-		Class<? extends SpecificRecordBase> record) {
-
-		super(
-			topic,
-			properties,
-			schema,
-			record);
-	}
-
-	/**
-	 * Sets a mapping from schema fields to fields of the produced Avro record.
-	 *
-	 * <p>A field mapping is required if the fields of produced tables should be named different than
-	 * the fields of the Avro record.
-	 * The key of the provided Map refers to the field of the table schema,
-	 * the value to the field of the Avro record.</p>
-	 *
-	 * @param fieldMapping A mapping from schema fields to Avro fields.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	@Override
-	public void setFieldMapping(Map<String, String> fieldMapping) {
-		super.setFieldMapping(fieldMapping);
-	}
-
-	/**
-	 * Declares a field of the schema to be a processing time attribute.
-	 *
-	 * @param proctimeAttribute The name of the field that becomes the processing time field.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	@Override
-	public void setProctimeAttribute(String proctimeAttribute) {
-		super.setProctimeAttribute(proctimeAttribute);
-	}
-
-	/**
-	 * Declares a field of the schema to be a rowtime attribute.
-	 *
-	 * @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
-		Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
-		super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
-	}
-
-	@Override
-	protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
-		return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties);
-	}
-
-	/**
-	 * Returns a builder to configure and create a {@link Kafka010AvroTableSource}.
-	 *
-	 * @return A builder to configure and create a {@link Kafka010AvroTableSource}.
-	 * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
-	 *             with descriptors for schema and format instead. Descriptors allow for
-	 *             implementation-agnostic definition of tables. See also
-	 *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
-	 */
-	@Deprecated
-	public static Builder builder() {
-		return new Builder();
-	}
-
-	/**
-	 * A builder to configure and create a {@link Kafka010AvroTableSource}.
-	 *
-	 * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
-	 *             with descriptors for schema and format instead. Descriptors allow for
-	 *             implementation-agnostic definition of tables. See also
-	 *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
-	 */
-	@Deprecated
-	public static class Builder extends KafkaAvroTableSource.Builder<Kafka010AvroTableSource, Kafka010AvroTableSource.Builder> {
-
-		@Override
-		protected boolean supportsKafkaTimestamps() {
-			return true;
-		}
-
-		@Override
-		protected Kafka010AvroTableSource.Builder builder() {
-			return this;
-		}
-
-		/**
-		 * Builds and configures a {@link Kafka010AvroTableSource}.
-		 *
-		 * @return A configured {@link Kafka010AvroTableSource}.
-		 * @deprecated Use table descriptors instead of implementation-specific builders.
-		 */
-		@Deprecated
-		@Override
-		public Kafka010AvroTableSource build() {
-			Kafka010AvroTableSource tableSource = new Kafka010AvroTableSource(
-				getTopic(),
-				getKafkaProps(),
-				getTableSchema(),
-				getAvroRecordClass());
-			super.configureTableSource(tableSource);
-			return tableSource;
-		}
-	}
-}
-
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
deleted file mode 100644
index b9a5350e618..00000000000
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.table.descriptors.ConnectorDescriptor;
-import org.apache.flink.types.Row;
-
-import java.util.Optional;
-import java.util.Properties;
-
-/**
- * Kafka 0.10 {@link KafkaTableSinkBase} that serializes data in JSON format.
- *
- * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
- *             with descriptors for schema and format instead. Descriptors allow for
- *             implementation-agnostic definition of tables. See also
- *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
- */
-@Deprecated
-public class Kafka010JsonTableSink extends KafkaJsonTableSink {
-
-	/**
-	 * Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.10
-	 * topic with fixed partition assignment.
-	 *
-	 * <p>Each parallel TableSink instance will write its rows to a single Kafka partition.</p>
-	 * <ul>
-	 * <li>If the number of Kafka partitions is less than the number of sink instances, different
-	 * sink instances will write to the same partition.</li>
-	 * <li>If the number of Kafka partitions is higher than the number of sink instance, some
-	 * Kafka partitions won't receive data.</li>
-	 * </ul>
-	 *
-	 * @param topic topic in Kafka to which table is written
-	 * @param properties properties to connect to Kafka
-	 * @deprecated Use table descriptors instead of implementation-specific classes.
-	 */
-	@Deprecated
-	public Kafka010JsonTableSink(String topic, Properties properties) {
-		super(topic, properties, new FlinkFixedPartitioner<>());
-	}
-
-	/**
-	 * Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.10
-	 * topic with custom partition assignment.
-	 *
-	 * @param topic topic in Kafka to which table is written
-	 * @param properties properties to connect to Kafka
-	 * @param partitioner Kafka partitioner
-	 * @deprecated Use table descriptors instead of implementation-specific classes.
-	 */
-	@Deprecated
-	public Kafka010JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
-		super(topic, properties, partitioner);
-	}
-
-	@Override
-	protected FlinkKafkaProducerBase<Row> createKafkaProducer(
-			String topic,
-			Properties properties,
-			SerializationSchema<Row> serializationSchema,
-			Optional<FlinkKafkaPartitioner<Row>> partitioner) {
-		return new FlinkKafkaProducer010<>(
-			topic,
-			serializationSchema,
-			properties,
-			partitioner.orElse(new FlinkFixedPartitioner<>()));
-	}
-
-	@Override
-	protected Kafka010JsonTableSink createCopy() {
-		return new Kafka010JsonTableSink(
-			topic,
-			properties,
-			partitioner.orElse(new FlinkFixedPartitioner<>()));
-	}
-}
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
deleted file mode 100644
index 38d9034a9a7..00000000000
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.ConnectorDescriptor;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.table.sources.StreamTableSource;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * Kafka {@link StreamTableSource} for Kafka 0.10.
- *
- * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
- *             with descriptors for schema and format instead. Descriptors allow for
- *             implementation-agnostic definition of tables. See also
- *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
- */
-@Deprecated
-public class Kafka010JsonTableSource extends KafkaJsonTableSource {
-
-	/**
-	 * Creates a Kafka 0.10 JSON {@link StreamTableSource}.
-	 *
-	 * @param topic       Kafka topic to consume.
-	 * @param properties  Properties for the Kafka consumer.
-	 * @param tableSchema The schema of the table.
-	 * @param jsonSchema  The schema of the JSON messages to decode from Kafka.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	public Kafka010JsonTableSource(
-		String topic,
-		Properties properties,
-		TableSchema tableSchema,
-		TableSchema jsonSchema) {
-
-		super(topic, properties, tableSchema, jsonSchema);
-	}
-
-	/**
-	 * Sets the flag that specifies the behavior in case of missing fields.
-	 * TableSource will fail for missing fields if set to true. If set to false, the missing field is set to null.
-	 *
-	 * @param failOnMissingField Flag that specifies the TableSource behavior in case of missing fields.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	@Override
-	public void setFailOnMissingField(boolean failOnMissingField) {
-		super.setFailOnMissingField(failOnMissingField);
-	}
-
-	/**
-	 * Sets the mapping from table schema fields to JSON schema fields.
-	 *
-	 * @param fieldMapping The mapping from table schema fields to JSON schema fields.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	@Override
-	public void setFieldMapping(Map<String, String> fieldMapping) {
-		super.setFieldMapping(fieldMapping);
-	}
-
-	/**
-	 * Declares a field of the schema to be a processing time attribute.
-	 *
-	 * @param proctimeAttribute The name of the field that becomes the processing time field.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	@Override
-	public void setProctimeAttribute(String proctimeAttribute) {
-		super.setProctimeAttribute(proctimeAttribute);
-	}
-
-	/**
-	 * Declares a field of the schema to be a rowtime attribute.
-	 *
-	 * @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
-		Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
-		super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
-	}
-
-	@Override
-	protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
-		return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties);
-	}
-
-	/**
-	 * Returns a builder to configure and create a {@link Kafka010JsonTableSource}.
-	 *
-	 * @return A builder to configure and create a {@link Kafka010JsonTableSource}.
-	 * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
-	 *             with descriptors for schema and format instead. Descriptors allow for
-	 *             implementation-agnostic definition of tables. See also
-	 *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
-	 */
-	@Deprecated
-	public static Kafka010JsonTableSource.Builder builder() {
-		return new Kafka010JsonTableSource.Builder();
-	}
-
-	/**
-	 * A builder to configure and create a {@link Kafka010JsonTableSource}.
-	 *
-	 * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
-	 *             with descriptors for schema and format instead. Descriptors allow for
-	 *             implementation-agnostic definition of tables. See also
-	 *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
-	 */
-	@Deprecated
-	public static class Builder extends KafkaJsonTableSource.Builder<Kafka010JsonTableSource, Kafka010JsonTableSource.Builder> {
-
-		@Override
-		protected boolean supportsKafkaTimestamps() {
-			return true;
-		}
-
-		@Override
-		protected Kafka010JsonTableSource.Builder builder() {
-			return this;
-		}
-
-		/**
-		 * Builds and configures a {@link Kafka010JsonTableSource}.
-		 *
-		 * @return A configured {@link Kafka010JsonTableSource}.
-		 * @deprecated Use table descriptors instead of implementation-specific builders.
-		 */
-		@Override
-		public Kafka010JsonTableSource build() {
-			Kafka010JsonTableSource tableSource = new Kafka010JsonTableSource(
-				getTopic(),
-				getKafkaProps(),
-				getTableSchema(),
-				getJsonSchema());
-			super.configureTableSource(tableSource);
-			return tableSource;
-		}
-	}
-}
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
deleted file mode 100644
index 22547af31dd..00000000000
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.formats.avro.AvroRowDeserializationSchema;
-import org.apache.flink.types.Row;
-
-/**
- * Tests for the {@link Kafka010AvroTableSource}.
- *
- * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
- *             drop support for format-specific table sources.
- */
-@Deprecated
-public class Kafka010AvroTableSourceTest extends KafkaAvroTableSourceTestBase {
-
-	@Override
-	protected KafkaTableSourceBase.Builder getBuilder() {
-		return Kafka010AvroTableSource.builder();
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
-		return (Class) AvroRowDeserializationSchema.class;
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
-		return (Class) FlinkKafkaConsumer010.class;
-	}
-}
-
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
deleted file mode 100644
index 4575f8763df..00000000000
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.formats.json.JsonRowSerializationSchema;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.types.Row;
-
-import java.util.Properties;
-
-/**
- * Tests for the {@link Kafka010JsonTableSink}.
- *
- * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
- *             drop support for format-specific table sinks.
- */
-@Deprecated
-public class Kafka010JsonTableSinkTest extends KafkaTableSinkBaseTestBase {
-
-	@Override
-	protected KafkaTableSinkBase createTableSink(
-			String topic,
-			Properties properties,
-			FlinkKafkaPartitioner<Row> partitioner) {
-
-		return new Kafka010JsonTableSink(
-			topic,
-			properties,
-			partitioner);
-	}
-
-	@Override
-	protected Class<? extends SerializationSchema<Row>> getSerializationSchemaClass() {
-		return JsonRowSerializationSchema.class;
-	}
-
-	@Override
-	protected Class<? extends FlinkKafkaProducerBase> getProducerClass() {
-		return FlinkKafkaProducer010.class;
-	}
-
-}
-
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactoryTest.java
deleted file mode 100644
index ab83a01c48f..00000000000
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactoryTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_010;
-
-/**
- * Tests for legacy Kafka010JsonTableSourceFactory.
- *
- * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
- *             drop support for format-specific table sources.
- */
-@Deprecated
-public class Kafka010JsonTableSourceFactoryTest extends KafkaJsonTableSourceFactoryTestBase {
-
-	@Override
-	protected String version() {
-		return CONNECTOR_VERSION_VALUE_010;
-	}
-
-	@Override
-	protected KafkaJsonTableSource.Builder builder() {
-		return Kafka010JsonTableSource.builder();
-	}
-}
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
deleted file mode 100644
index 330e55ad0e3..00000000000
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.formats.json.JsonRowDeserializationSchema;
-import org.apache.flink.types.Row;
-
-/**
- * Tests for the {@link Kafka010JsonTableSource}.
- *
- * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
- *             drop support for format-specific table sources.
- */
-@Deprecated
-public class Kafka010JsonTableSourceTest extends KafkaJsonTableSourceTestBase {
-
-	@Override
-	protected KafkaTableSourceBase.Builder getBuilder() {
-		return Kafka010JsonTableSource.builder();
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
-		return (Class) JsonRowDeserializationSchema.class;
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
-		return (Class) FlinkKafkaConsumer010.class;
-	}
-}
diff --git a/flink-connectors/flink-connector-kafka-0.11/pom.xml b/flink-connectors/flink-connector-kafka-0.11/pom.xml
index 9ace51043db..cf25ee933bb 100644
--- a/flink-connectors/flink-connector-kafka-0.11/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.11/pom.xml
@@ -86,22 +86,6 @@ under the License.
 			<optional>true</optional>
 		</dependency>
 
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-avro</artifactId>
-			<version>${project.version}</version>
-			<!-- Projects depending on this project, won't depend on flink-avro. -->
-			<optional>true</optional>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-json</artifactId>
-			<version>${project.version}</version>
-			<!-- Projects depending on this project, won't depend on flink-json. -->
-			<optional>true</optional>
-		</dependency>
-
 		<!-- test dependencies -->
 
 		<dependency>
@@ -112,23 +96,6 @@ under the License.
 			<type>test-jar</type>
 		</dependency>
 
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-avro</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-			<type>test-jar</type>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-json</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
-
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
deleted file mode 100644
index fab592f1bda..00000000000
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.ConnectorDescriptor;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.table.sources.StreamTableSource;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * Kafka {@link StreamTableSource} for Kafka 0.11.
- *
- * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
- *             with descriptors for schema and format instead. Descriptors allow for
- *             implementation-agnostic definition of tables. See also
- *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
- */
-@Deprecated
-public class Kafka011AvroTableSource extends KafkaAvroTableSource {
-
-	/**
-	 * Creates a Kafka 0.11 Avro {@link StreamTableSource} using a given {@link SpecificRecord}.
-	 *
-	 * @param topic      Kafka topic to consume.
-	 * @param properties Properties for the Kafka consumer.
-	 * @param schema     Schema of the produced table.
-	 * @param record     Avro specific record.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	public Kafka011AvroTableSource(
-		String topic,
-		Properties properties,
-		TableSchema schema,
-		Class<? extends SpecificRecordBase> record) {
-
-		super(
-			topic,
-			properties,
-			schema,
-			record);
-	}
-
-	/**
-	 * Sets a mapping from schema fields to fields of the produced Avro record.
-	 *
-	 * <p>A field mapping is required if the fields of produced tables should be named different than
-	 * the fields of the Avro record.
-	 * The key of the provided Map refers to the field of the table schema,
-	 * the value to the field of the Avro record.</p>
-	 *
-	 * @param fieldMapping A mapping from schema fields to Avro fields.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	@Override
-	public void setFieldMapping(Map<String, String> fieldMapping) {
-		super.setFieldMapping(fieldMapping);
-	}
-
-	/**
-	 * Declares a field of the schema to be a processing time attribute.
-	 *
-	 * @param proctimeAttribute The name of the field that becomes the processing time field.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	@Override
-	public void setProctimeAttribute(String proctimeAttribute) {
-		super.setProctimeAttribute(proctimeAttribute);
-	}
-
-	/**
-	 * Declares a field of the schema to be a rowtime attribute.
-	 *
-	 * @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
-		Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
-		super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
-	}
-
-	@Override
-	protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
-		return new FlinkKafkaConsumer011<>(topic, deserializationSchema, properties);
-	}
-
-	/**
-	 * Returns a builder to configure and create a {@link Kafka011AvroTableSource}.
-	 * @return A builder to configure and create a {@link Kafka011AvroTableSource}.
-	 * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
-	 *             with descriptors for schema and format instead. Descriptors allow for
-	 *             implementation-agnostic definition of tables. See also
-	 *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
-	 */
-	@Deprecated
-	public static Builder builder() {
-		return new Builder();
-	}
-
-	/**
-	 * A builder to configure and create a {@link Kafka011AvroTableSource}.
-	 *
-	 * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
-	 *             with descriptors for schema and format instead. Descriptors allow for
-	 *             implementation-agnostic definition of tables. See also
-	 *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
-	 */
-	@Deprecated
-	public static class Builder extends KafkaAvroTableSource.Builder<Kafka011AvroTableSource, Kafka011AvroTableSource.Builder> {
-
-		@Override
-		protected boolean supportsKafkaTimestamps() {
-			return true;
-		}
-
-		@Override
-		protected Kafka011AvroTableSource.Builder builder() {
-			return this;
-		}
-
-		/**
-		 * Builds and configures a {@link Kafka011AvroTableSource}.
-		 *
-		 * @return A configured {@link Kafka011AvroTableSource}.
-		 * @deprecated Use table descriptors instead of implementation-specific builders.
-		 */
-		@Deprecated
-		@Override
-		public Kafka011AvroTableSource build() {
-			Kafka011AvroTableSource tableSource = new Kafka011AvroTableSource(
-				getTopic(),
-				getKafkaProps(),
-				getTableSchema(),
-				getAvroRecordClass());
-			super.configureTableSource(tableSource);
-			return tableSource;
-		}
-	}
-}
-
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
deleted file mode 100644
index 375eeadffb9..00000000000
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.ConnectorDescriptor;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.table.sources.StreamTableSource;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * Kafka {@link StreamTableSource} for Kafka 0.11.
- *
- * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
- *             with descriptors for schema and format instead. Descriptors allow for
- *             implementation-agnostic definition of tables. See also
- *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
- */
-@Deprecated
-public class Kafka011JsonTableSource extends KafkaJsonTableSource {
-
-	/**
-	 * Creates a Kafka 0.11 JSON {@link StreamTableSource}.
-	 *
-	 * @param topic       Kafka topic to consume.
-	 * @param properties  Properties for the Kafka consumer.
-	 * @param tableSchema The schema of the table.
-	 * @param jsonSchema  The schema of the JSON messages to decode from Kafka.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	public Kafka011JsonTableSource(
-		String topic,
-		Properties properties,
-		TableSchema tableSchema,
-		TableSchema jsonSchema) {
-
-		super(topic, properties, tableSchema, jsonSchema);
-	}
-
-	/**
-	 * Sets the flag that specifies the behavior in case of missing fields.
-	 * TableSource will fail for missing fields if set to true. If set to false, the missing field is set to null.
-	 *
-	 * @param failOnMissingField Flag that specifies the TableSource behavior in case of missing fields.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	@Override
-	public void setFailOnMissingField(boolean failOnMissingField) {
-		super.setFailOnMissingField(failOnMissingField);
-	}
-
-	/**
-	 * Sets the mapping from table schema fields to JSON schema fields.
-	 *
-	 * @param fieldMapping The mapping from table schema fields to JSON schema fields.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	@Override
-	public void setFieldMapping(Map<String, String> fieldMapping) {
-		super.setFieldMapping(fieldMapping);
-	}
-
-	/**
-	 * Declares a field of the schema to be a processing time attribute.
-	 *
-	 * @param proctimeAttribute The name of the field that becomes the processing time field.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	@Override
-	public void setProctimeAttribute(String proctimeAttribute) {
-		super.setProctimeAttribute(proctimeAttribute);
-	}
-
-	/**
-	 * Declares a field of the schema to be a rowtime attribute.
-	 *
-	 * @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
-		Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
-		super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
-	}
-
-	@Override
-	protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
-		return new FlinkKafkaConsumer011<>(topic, deserializationSchema, properties);
-	}
-
-	/**
-	 * Returns a builder to configure and create a {@link Kafka011JsonTableSource}.
-	 *
-	 * @return A builder to configure and create a {@link Kafka011JsonTableSource}.
-	 * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
-	 *             with descriptors for schema and format instead. Descriptors allow for
-	 *             implementation-agnostic definition of tables. See also
-	 *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
-	 */
-	@Deprecated
-	public static Kafka011JsonTableSource.Builder builder() {
-		return new Kafka011JsonTableSource.Builder();
-	}
-
-	/**
-	 * A builder to configure and create a {@link Kafka011JsonTableSource}.
-	 *
-	 * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
-	 *             with descriptors for schema and format instead. Descriptors allow for
-	 *             implementation-agnostic definition of tables. See also
-	 *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
-	 */
-	@Deprecated
-	public static class Builder extends KafkaJsonTableSource.Builder<Kafka011JsonTableSource, Kafka011JsonTableSource.Builder> {
-
-		@Override
-		protected boolean supportsKafkaTimestamps() {
-			return true;
-		}
-
-		@Override
-		protected Kafka011JsonTableSource.Builder builder() {
-			return this;
-		}
-
-		/**
-		 * Builds and configures a {@link Kafka011JsonTableSource}.
-		 *
-		 * @return A configured {@link Kafka011JsonTableSource}.
-		 * @deprecated Use table descriptors instead of implementation-specific builders.
-		 */
-		@Deprecated
-		@Override
-		public Kafka011JsonTableSource build() {
-			Kafka011JsonTableSource tableSource = new Kafka011JsonTableSource(
-				getTopic(),
-				getKafkaProps(),
-				getTableSchema(),
-				getJsonSchema());
-			super.configureTableSource(tableSource);
-			return tableSource;
-		}
-	}
-}
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java
deleted file mode 100644
index f7bed64d556..00000000000
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.formats.avro.AvroRowDeserializationSchema;
-import org.apache.flink.types.Row;
-
-/**
- * Tests for the {@link Kafka011AvroTableSource}.
- *
- * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
- *             drop support for format-specific table sources.
- */
-@Deprecated
-public class Kafka011AvroTableSourceTest extends KafkaAvroTableSourceTestBase {
-
-	@Override
-	protected KafkaTableSourceBase.Builder getBuilder() {
-		return Kafka011AvroTableSource.builder();
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
-		return (Class) AvroRowDeserializationSchema.class;
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
-		return (Class) FlinkKafkaConsumer011.class;
-	}
-}
-
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactoryTest.java
deleted file mode 100644
index ae46dac6a6a..00000000000
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactoryTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_011;
-
-/**
- * Tests for legacy Kafka011JsonTableSourceFactory.
- *
- * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
- *             drop support for format-specific table sources.
- */
-@Deprecated
-public class Kafka011JsonTableSourceFactoryTest extends KafkaJsonTableSourceFactoryTestBase {
-
-	@Override
-	protected String version() {
-		return CONNECTOR_VERSION_VALUE_011;
-	}
-
-	@Override
-	protected KafkaJsonTableSource.Builder builder() {
-		return Kafka011JsonTableSource.builder();
-	}
-}
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java
deleted file mode 100644
index 1a851c1c662..00000000000
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.formats.json.JsonRowDeserializationSchema;
-import org.apache.flink.types.Row;
-
-/**
- * Tests for the {@link Kafka011JsonTableSource}.
- *
- * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
- *             drop support for format-specific table sources.
- */
-@Deprecated
-public class Kafka011JsonTableSourceTest extends KafkaJsonTableSourceTestBase {
-
-	@Override
-	protected KafkaTableSourceBase.Builder getBuilder() {
-		return Kafka011JsonTableSource.builder();
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
-		return (Class) JsonRowDeserializationSchema.class;
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
-		return (Class) FlinkKafkaConsumer011.class;
-	}
-}
diff --git a/flink-connectors/flink-connector-kafka-0.8/pom.xml b/flink-connectors/flink-connector-kafka-0.8/pom.xml
index b3a65931869..7b360689e38 100644
--- a/flink-connectors/flink-connector-kafka-0.8/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.8/pom.xml
@@ -81,22 +81,6 @@ under the License.
 			<optional>true</optional>
 		</dependency>
 
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-avro</artifactId>
-			<version>${project.version}</version>
-			<!-- Projects depending on this project, won't depend on flink-avro. -->
-			<optional>true</optional>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-json</artifactId>
-			<version>${project.version}</version>
-			<!-- Projects depending on this project, won't depend on flink-json. -->
-			<optional>true</optional>
-		</dependency>
-
 		<dependency>
 			<groupId>org.apache.kafka</groupId>
 			<artifactId>kafka_${scala.binary.version}</artifactId>
@@ -151,22 +135,6 @@ under the License.
 			<type>test-jar</type>
 		</dependency>
 
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-avro</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-			<type>test-jar</type>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-json</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-			<type>test-jar</type>
-		</dependency>
-
 		<dependency>
 			<groupId>org.apache.curator</groupId>
 			<artifactId>curator-test</artifactId>
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
deleted file mode 100644
index 61c96bf1e5a..00000000000
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.ConnectorDescriptor;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.table.sources.StreamTableSource;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * Kafka {@link StreamTableSource} for Kafka 0.8.
- *
- * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
- *             with descriptors for schema and format instead. Descriptors allow for
- *             implementation-agnostic definition of tables. See also
- *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
- */
-@Deprecated
-public class Kafka08AvroTableSource extends KafkaAvroTableSource {
-
-	/**
-	 * Creates a Kafka 0.8 Avro {@link StreamTableSource} using a given {@link SpecificRecord}.
-	 *
-	 * @param topic      Kafka topic to consume.
-	 * @param properties Properties for the Kafka consumer.
-	 * @param schema     Schema of the produced table.
-	 * @param record     Avro specific record.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	public Kafka08AvroTableSource(
-		String topic,
-		Properties properties,
-		TableSchema schema,
-		Class<? extends SpecificRecordBase> record) {
-
-		super(
-			topic,
-			properties,
-			schema,
-			record);
-	}
-
-	/**
-	 * Sets a mapping from schema fields to fields of the produced Avro record.
-	 *
-	 * <p>A field mapping is required if the fields of produced tables should be named different than
-	 * the fields of the Avro record.
-	 * The key of the provided Map refers to the field of the table schema,
-	 * the value to the field of the Avro record.</p>
-	 *
-	 * @param fieldMapping A mapping from schema fields to Avro fields.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	@Override
-	public void setFieldMapping(Map<String, String> fieldMapping) {
-		super.setFieldMapping(fieldMapping);
-	}
-
-	/**
-	 * Declares a field of the schema to be a processing time attribute.
-	 *
-	 * @param proctimeAttribute The name of the field that becomes the processing time field.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	@Override
-	public void setProctimeAttribute(String proctimeAttribute) {
-		super.setProctimeAttribute(proctimeAttribute);
-	}
-
-	/**
-	 * Declares a field of the schema to be a rowtime attribute.
-	 *
-	 * @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
-		Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
-		super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
-	}
-
-	@Override
-	protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
-		return new FlinkKafkaConsumer08<>(topic, deserializationSchema, properties);
-	}
-
-	/**
-	 * Returns a builder to configure and create a {@link Kafka08AvroTableSource}.
-	 *
-	 * @return A builder to configure and create a {@link Kafka08AvroTableSource}.
-	 * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
-	 *             with descriptors for schema and format instead. Descriptors allow for
-	 *             implementation-agnostic definition of tables. See also
-	 *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
-	 */
-	@Deprecated
-	public static Builder builder() {
-		return new Builder();
-	}
-
-	/**
-	 * A builder to configure and create a {@link Kafka08AvroTableSource}.
-	 *
-	 * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
-	 *             with descriptors for schema and format instead. Descriptors allow for
-	 *             implementation-agnostic definition of tables. See also
-	 *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
-	 */
-	@Deprecated
-	public static class Builder extends KafkaAvroTableSource.Builder<Kafka08AvroTableSource, Kafka08AvroTableSource.Builder> {
-
-		@Override
-		protected boolean supportsKafkaTimestamps() {
-			return false;
-		}
-
-		@Override
-		protected Kafka08AvroTableSource.Builder builder() {
-			return this;
-		}
-
-		/**
-		 * Builds and configures a {@link Kafka08AvroTableSource}.
-		 *
-		 * @return A configured {@link Kafka08AvroTableSource}.
-		 * @deprecated Use table descriptors instead of implementation-specific builders.
-		 */
-		@Deprecated
-		@Override
-		public Kafka08AvroTableSource build() {
-			Kafka08AvroTableSource tableSource = new Kafka08AvroTableSource(
-				getTopic(),
-				getKafkaProps(),
-				getTableSchema(),
-				getAvroRecordClass());
-			super.configureTableSource(tableSource);
-			return tableSource;
-		}
-	}
-}
-
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
deleted file mode 100644
index b7474e23651..00000000000
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.table.descriptors.ConnectorDescriptor;
-import org.apache.flink.types.Row;
-
-import java.util.Optional;
-import java.util.Properties;
-
-/**
- * Kafka 0.8 {@link KafkaTableSinkBase} that serializes data in JSON format.
- *
- * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
- *             with descriptors for schema and format instead. Descriptors allow for
- *             implementation-agnostic definition of tables. See also
- *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
- */
-@Deprecated
-public class Kafka08JsonTableSink extends KafkaJsonTableSink {
-
-	/**
-	 * Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.8
-	 * topic with fixed partition assignment.
-	 *
-	 * <p>Each parallel TableSink instance will write its rows to a single Kafka partition.</p>
-	 * <ul>
-	 * <li>If the number of Kafka partitions is less than the number of sink instances, different
-	 * sink instances will write to the same partition.</li>
-	 * <li>If the number of Kafka partitions is higher than the number of sink instance, some
-	 * Kafka partitions won't receive data.</li>
-	 * </ul>
-	 *
-	 * @param topic topic in Kafka to which table is written
-	 * @param properties properties to connect to Kafka
-	 * @deprecated Use table descriptors instead of implementation-specific classes.
-	 */
-	@Deprecated
-	public Kafka08JsonTableSink(String topic, Properties properties) {
-		super(topic, properties, new FlinkFixedPartitioner<>());
-	}
-
-	/**
-	 * Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.8
-	 * topic with custom partition assignment.
-	 *
-	 * @param topic topic in Kafka to which table is written
-	 * @param properties properties to connect to Kafka
-	 * @param partitioner Kafka partitioner
-	 * @deprecated Use table descriptors instead of implementation-specific classes.
-	 */
-	@Deprecated
-	public Kafka08JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
-		super(topic, properties, partitioner);
-	}
-
-	/**
-	 * Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.8
-	 * topic with custom partition assignment.
-	 *
-	 * @param topic topic in Kafka to which table is written
-	 * @param properties properties to connect to Kafka
-	 * @param partitioner Kafka partitioner
-	 *
-	 * @deprecated This is a deprecated constructor that does not correctly handle partitioning when
-	 *             producing to multiple topics. Use
-	 *             {@link #Kafka08JsonTableSink(String, Properties, FlinkKafkaPartitioner)} instead.
-	 */
-	@Deprecated
-	public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
-		super(topic, properties, new FlinkKafkaDelegatePartitioner<>(partitioner));
-	}
-
-	@Override
-	protected FlinkKafkaProducerBase<Row> createKafkaProducer(
-			String topic,
-			Properties properties,
-			SerializationSchema<Row> serializationSchema,
-			Optional<FlinkKafkaPartitioner<Row>> partitioner) {
-		return new FlinkKafkaProducer08<>(
-			topic,
-			serializationSchema,
-			properties,
-			partitioner.orElse(new FlinkFixedPartitioner<>()));
-	}
-
-	@Override
-	protected Kafka08JsonTableSink createCopy() {
-		return new Kafka08JsonTableSink(
-			topic,
-			properties,
-			partitioner.orElse(new FlinkFixedPartitioner<>()));
-	}
-}
-
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
deleted file mode 100644
index dc5a077a89c..00000000000
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.ConnectorDescriptor;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.table.sources.StreamTableSource;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * Kafka {@link StreamTableSource} for Kafka 0.8.
- *
- * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
- *             with descriptors for schema and format instead. Descriptors allow for
- *             implementation-agnostic definition of tables. See also
- *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
- */
-@Deprecated
-public class Kafka08JsonTableSource extends KafkaJsonTableSource {
-
-	/**
-	 * Creates a Kafka 0.8 JSON {@link StreamTableSource}.
-	 *
-	 * @param topic       Kafka topic to consume.
-	 * @param properties  Properties for the Kafka consumer.
-	 * @param tableSchema The schema of the table.
-	 * @param jsonSchema  The schema of the JSON messages to decode from Kafka.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	public Kafka08JsonTableSource(
-		String topic,
-		Properties properties,
-		TableSchema tableSchema,
-		TableSchema jsonSchema) {
-
-		super(topic, properties, tableSchema, jsonSchema);
-	}
-
-	/**
-	 * Sets the flag that specifies the behavior in case of missing fields.
-	 * TableSource will fail for missing fields if set to true. If set to false, the missing field is set to null.
-	 *
-	 * @param failOnMissingField Flag that specifies the TableSource behavior in case of missing fields.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	@Override
-	public void setFailOnMissingField(boolean failOnMissingField) {
-		super.setFailOnMissingField(failOnMissingField);
-	}
-
-	/**
-	 * Sets the mapping from table schema fields to JSON schema fields.
-	 *
-	 * @param fieldMapping The mapping from table schema fields to JSON schema fields.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	@Override
-	public void setFieldMapping(Map<String, String> fieldMapping) {
-		super.setFieldMapping(fieldMapping);
-	}
-
-	/**
-	 * Declares a field of the schema to be a processing time attribute.
-	 *
-	 * @param proctimeAttribute The name of the field that becomes the processing time field.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	@Override
-	public void setProctimeAttribute(String proctimeAttribute) {
-		super.setProctimeAttribute(proctimeAttribute);
-	}
-
-	/**
-	 * Declares a field of the schema to be a rowtime attribute.
-	 *
-	 * @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
-		Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
-		super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
-	}
-
-	@Override
-	protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
-		return new FlinkKafkaConsumer08<>(topic, deserializationSchema, properties);
-	}
-
-	/**
-	 * Returns a builder to configure and create a {@link Kafka08JsonTableSource}.
-	 * @return A builder to configure and create a {@link Kafka08JsonTableSource}.
-	 * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
-	 *             with descriptors for schema and format instead. Descriptors allow for
-	 *             implementation-agnostic definition of tables. See also
-	 *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
-	 */
-	@Deprecated
-	public static Kafka08JsonTableSource.Builder builder() {
-		return new Kafka08JsonTableSource.Builder();
-	}
-
-	/**
-	 * A builder to configure and create a {@link Kafka08JsonTableSource}.
-	 *
-	 * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
-	 *             with descriptors for schema and format instead. Descriptors allow for
-	 *             implementation-agnostic definition of tables. See also
-	 *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
-	 */
-	@Deprecated
-	public static class Builder extends KafkaJsonTableSource.Builder<Kafka08JsonTableSource, Kafka08JsonTableSource.Builder> {
-
-		@Override
-		protected boolean supportsKafkaTimestamps() {
-			return false;
-		}
-
-		@Override
-		protected Kafka08JsonTableSource.Builder builder() {
-			return this;
-		}
-
-		/**
-		 * Builds and configures a {@link Kafka08JsonTableSource}.
-		 *
-		 * @return A configured {@link Kafka08JsonTableSource}.
-		 * @deprecated Use table descriptors instead of implementation-specific builders.
-		 */
-		@Deprecated
-		@Override
-		public Kafka08JsonTableSource build() {
-			Kafka08JsonTableSource tableSource = new Kafka08JsonTableSource(
-				getTopic(),
-				getKafkaProps(),
-				getTableSchema(),
-				getJsonSchema());
-			super.configureTableSource(tableSource);
-			return tableSource;
-		}
-	}
-}
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java
deleted file mode 100644
index bc7b9b6d1ef..00000000000
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.formats.avro.AvroRowDeserializationSchema;
-import org.apache.flink.types.Row;
-
-/**
- * Tests for the {@link Kafka08AvroTableSource}.
- *
- * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
- *             drop support for format-specific table sources.
- */
-@Deprecated
-public class Kafka08AvroTableSourceTest extends KafkaAvroTableSourceTestBase {
-
-	@Override
-	protected KafkaTableSourceBase.Builder getBuilder() {
-		return Kafka08AvroTableSource.builder();
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
-		return (Class) AvroRowDeserializationSchema.class;
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
-		return (Class) FlinkKafkaConsumer08.class;
-	}
-}
-
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
deleted file mode 100644
index aa5fa1609bd..00000000000
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.formats.json.JsonRowSerializationSchema;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.types.Row;
-
-import java.util.Properties;
-
-/**
- * Tests for the {@link Kafka08JsonTableSink}.
- *
- * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
- *             drop support for format-specific table sinks.
- */
-@Deprecated
-public class Kafka08JsonTableSinkTest extends KafkaTableSinkBaseTestBase {
-
-	@Override
-	protected KafkaTableSinkBase createTableSink(
-			String topic,
-			Properties properties,
-			FlinkKafkaPartitioner<Row> partitioner) {
-
-		return new Kafka08JsonTableSink(
-			topic,
-			properties,
-			partitioner);
-	}
-
-	@Override
-	protected Class<? extends SerializationSchema<Row>> getSerializationSchemaClass() {
-		return JsonRowSerializationSchema.class;
-	}
-
-	@Override
-	protected Class<? extends FlinkKafkaProducerBase> getProducerClass() {
-		return FlinkKafkaProducer08.class;
-	}
-}
-
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactoryTest.java
deleted file mode 100644
index 915ce42d313..00000000000
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactoryTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_08;
-
-/**
- * Tests for legacy Kafka08JsonTableSourceFactory.
- *
- * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
- *             drop support for format-specific table sources.
- */
-@Deprecated
-public class Kafka08JsonTableSourceFactoryTest extends KafkaJsonTableSourceFactoryTestBase {
-
-	@Override
-	protected String version() {
-		return CONNECTOR_VERSION_VALUE_08;
-	}
-
-	@Override
-	protected KafkaJsonTableSource.Builder builder() {
-		return Kafka08JsonTableSource.builder();
-	}
-}
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
deleted file mode 100644
index dba8f7dee4e..00000000000
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.formats.json.JsonRowDeserializationSchema;
-import org.apache.flink.types.Row;
-
-/**
- * Tests for the {@link Kafka08JsonTableSource}.
- *
- * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
- *             drop support for format-specific table sources.
- */
-@Deprecated
-public class Kafka08JsonTableSourceTest extends KafkaJsonTableSourceTestBase {
-
-	@Override
-	protected KafkaTableSourceBase.Builder getBuilder() {
-		return Kafka08JsonTableSource.builder();
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
-		return (Class) JsonRowDeserializationSchema.class;
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
-		return (Class) FlinkKafkaConsumer08.class;
-	}
-}
diff --git a/flink-connectors/flink-connector-kafka-0.9/pom.xml b/flink-connectors/flink-connector-kafka-0.9/pom.xml
index 5dc394e27f3..28812919550 100644
--- a/flink-connectors/flink-connector-kafka-0.9/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.9/pom.xml
@@ -76,22 +76,6 @@ under the License.
 			<optional>true</optional>
 		</dependency>
 
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-avro</artifactId>
-			<version>${project.version}</version>
-			<!-- Projects depending on this project, won't depend on flink-avro. -->
-			<optional>true</optional>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-json</artifactId>
-			<version>${project.version}</version>
-			<!-- Projects depending on this project, won't depend on flink-json. -->
-			<optional>true</optional>
-		</dependency>
-
 		<dependency>
 			<groupId>org.apache.kafka</groupId>
 			<artifactId>kafka-clients</artifactId>
@@ -100,22 +84,6 @@ under the License.
 
 		<!-- test dependencies -->
 
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-avro</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-			<type>test-jar</type>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-json</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
-
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
deleted file mode 100644
index 4352d7e400d..00000000000
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.ConnectorDescriptor;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.table.sources.StreamTableSource;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * Kafka {@link StreamTableSource} for Kafka 0.9.
- *
- * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
- *             with descriptors for schema and format instead. Descriptors allow for
- *             implementation-agnostic definition of tables. See also
- *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
- */
-@Deprecated
-public class Kafka09AvroTableSource extends KafkaAvroTableSource {
-
-	/**
-	 * Creates a Kafka 0.9 Avro {@link StreamTableSource} using a given {@link SpecificRecord}.
-	 *
-	 * @param topic      Kafka topic to consume.
-	 * @param properties Properties for the Kafka consumer.
-	 * @param schema	 Schema of the produced table.
-	 * @param record     Avro specific record.
-	 */
-	public Kafka09AvroTableSource(
-		String topic,
-		Properties properties,
-		TableSchema schema,
-		Class<? extends SpecificRecordBase> record) {
-
-		super(
-			topic,
-			properties,
-			schema,
-			record);
-	}
-
-	/**
-	 * Sets a mapping from schema fields to fields of the produced Avro record.
-	 *
-	 * <p>A field mapping is required if the fields of produced tables should be named different than
-	 * the fields of the Avro record.
-	 * The key of the provided Map refers to the field of the table schema,
-	 * the value to the field of the Avro record.</p>
-	 *
-	 * @param fieldMapping A mapping from schema fields to Avro fields.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	@Override
-	public void setFieldMapping(Map<String, String> fieldMapping) {
-		super.setFieldMapping(fieldMapping);
-	}
-
-	/**
-	 * Declares a field of the schema to be a processing time attribute.
-	 *
-	 * @param proctimeAttribute The name of the field that becomes the processing time field.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	@Override
-	public void setProctimeAttribute(String proctimeAttribute) {
-		super.setProctimeAttribute(proctimeAttribute);
-	}
-
-	/**
-	 * Declares a field of the schema to be a rowtime attribute.
-	 *
-	 * @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
-		Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
-		super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
-	}
-
-	@Override
-	protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
-		return new FlinkKafkaConsumer09<>(topic, deserializationSchema, properties);
-	}
-
-	/**
-	 * Returns a builder to configure and create a {@link Kafka09AvroTableSource}.
-	 *
-	 * @return A builder to configure and create a {@link Kafka09AvroTableSource}.
-	 * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
-	 *             with descriptors for schema and format instead. Descriptors allow for
-	 *             implementation-agnostic definition of tables. See also
-	 *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
-	 */
-	@Deprecated
-	public static Builder builder() {
-		return new Builder();
-	}
-
-	/**
-	 * A builder to configure and create a {@link Kafka09AvroTableSource}.
-	 *
-	 * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
-	 *             with descriptors for schema and format instead. Descriptors allow for
-	 *             implementation-agnostic definition of tables. See also
-	 *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
-	 */
-	@Deprecated
-	public static class Builder extends KafkaAvroTableSource.Builder<Kafka09AvroTableSource, Kafka09AvroTableSource.Builder> {
-
-		@Override
-		protected boolean supportsKafkaTimestamps() {
-			return false;
-		}
-
-		@Override
-		protected Kafka09AvroTableSource.Builder builder() {
-			return this;
-		}
-
-		/**
-		 * Builds and configures a {@link Kafka09AvroTableSource}.
-		 *
-		 * @return A configured {@link Kafka09AvroTableSource}.
-		 * @deprecated Use table descriptors instead of implementation-specific builders.
-		 */
-		@Deprecated
-		@Override
-		public Kafka09AvroTableSource build() {
-			Kafka09AvroTableSource tableSource = new Kafka09AvroTableSource(
-				getTopic(),
-				getKafkaProps(),
-				getTableSchema(),
-				getAvroRecordClass());
-			super.configureTableSource(tableSource);
-			return tableSource;
-		}
-	}
-}
-
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
deleted file mode 100644
index cd27f8371c2..00000000000
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.table.descriptors.ConnectorDescriptor;
-import org.apache.flink.types.Row;
-
-import java.util.Optional;
-import java.util.Properties;
-
-/**
- * Kafka 0.9 {@link KafkaTableSinkBase} that serializes data in JSON format.
- *
- * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
- *             with descriptors for schema and format instead. Descriptors allow for
- *             implementation-agnostic definition of tables. See also
- *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
- */
-@Deprecated
-public class Kafka09JsonTableSink extends KafkaJsonTableSink {
-
-	/**
-	 * Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.9
-	 * topic with fixed partition assignment.
-	 *
-	 * <p>Each parallel TableSink instance will write its rows to a single Kafka partition.</p>
-	 * <ul>
-	 * <li>If the number of Kafka partitions is less than the number of sink instances, different
-	 * sink instances will write to the same partition.</li>
-	 * <li>If the number of Kafka partitions is higher than the number of sink instance, some
-	 * Kafka partitions won't receive data.</li>
-	 * </ul>
-	 *
-	 * @param topic topic in Kafka to which table is written
-	 * @param properties properties to connect to Kafka
-	 * @deprecated Use table descriptors instead of implementation-specific classes.
-	 */
-	@Deprecated
-	public Kafka09JsonTableSink(String topic, Properties properties) {
-		super(topic, properties, new FlinkFixedPartitioner<>());
-	}
-
-	/**
-	 * Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.9
-	 * topic with custom partition assignment.
-	 *
-	 * @param topic topic in Kafka to which table is written
-	 * @param properties properties to connect to Kafka
-	 * @param partitioner Kafka partitioner
-	 * @deprecated Use table descriptors instead of implementation-specific classes.
-	 */
-	@Deprecated
-	public Kafka09JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
-		super(topic, properties, partitioner);
-	}
-
-	/**
-	 * Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.9
-	 * topic with custom partition assignment.
-	 *
-	 * @param topic topic in Kafka to which table is written
-	 * @param properties properties to connect to Kafka
-	 * @param partitioner Kafka partitioner
-	 *
-	 * @deprecated This is a deprecated constructor that does not correctly handle partitioning when
-	 *             producing to multiple topics. Use
-	 *             {@link #Kafka09JsonTableSink(String, Properties, FlinkKafkaPartitioner)} instead.
-	 */
-	@Deprecated
-	public Kafka09JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
-		super(topic, properties, new FlinkKafkaDelegatePartitioner<>(partitioner));
-	}
-
-	@Override
-	protected FlinkKafkaProducerBase<Row> createKafkaProducer(
-			String topic,
-			Properties properties,
-			SerializationSchema<Row> serializationSchema,
-			Optional<FlinkKafkaPartitioner<Row>> partitioner) {
-		return new FlinkKafkaProducer09<>(
-			topic,
-			serializationSchema,
-			properties,
-			partitioner.orElse(new FlinkFixedPartitioner<>()));
-	}
-
-	@Override
-	protected Kafka09JsonTableSink createCopy() {
-		return new Kafka09JsonTableSink(
-			topic,
-			properties,
-			partitioner.orElse(new FlinkFixedPartitioner<>()));
-	}
-}
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
deleted file mode 100644
index db1df3d889e..00000000000
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.ConnectorDescriptor;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.table.sources.StreamTableSource;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * Kafka {@link StreamTableSource} for Kafka 0.9.
- *
- * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
- *             with descriptors for schema and format instead. Descriptors allow for
- *             implementation-agnostic definition of tables. See also
- *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
- */
-@Deprecated
-public class Kafka09JsonTableSource extends KafkaJsonTableSource {
-
-	/**
-	 * Creates a Kafka 0.9 JSON {@link StreamTableSource}.
-	 *
-	 * @param topic       Kafka topic to consume.
-	 * @param properties  Properties for the Kafka consumer.
-	 * @param tableSchema The schema of the table.
-	 * @param jsonSchema  The schema of the JSON messages to decode from Kafka.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	public Kafka09JsonTableSource(
-		String topic,
-		Properties properties,
-		TableSchema tableSchema,
-		TableSchema jsonSchema) {
-
-		super(topic, properties, tableSchema, jsonSchema);
-	}
-
-	/**
-	 * Sets the flag that specifies the behavior in case of missing fields.
-	 * TableSource will fail for missing fields if set to true. If set to false, the missing field is set to null.
-	 *
-	 * @param failOnMissingField Flag that specifies the TableSource behavior in case of missing fields.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	@Override
-	public void setFailOnMissingField(boolean failOnMissingField) {
-		super.setFailOnMissingField(failOnMissingField);
-	}
-
-	/**
-	 * Sets the mapping from table schema fields to JSON schema fields.
-	 *
-	 * @param fieldMapping The mapping from table schema fields to JSON schema fields.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	@Override
-	public void setFieldMapping(Map<String, String> fieldMapping) {
-		super.setFieldMapping(fieldMapping);
-	}
-
-	/**
-	 * Declares a field of the schema to be a processing time attribute.
-	 *
-	 * @param proctimeAttribute The name of the field that becomes the processing time field.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	@Override
-	public void setProctimeAttribute(String proctimeAttribute) {
-		super.setProctimeAttribute(proctimeAttribute);
-	}
-
-	/**
-	 * Declares a field of the schema to be a rowtime attribute.
-	 *
-	 * @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
-		Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
-		super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
-	}
-
-	@Override
-	protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
-		return new FlinkKafkaConsumer09<>(topic, deserializationSchema, properties);
-	}
-
-	/**
-	 * Returns a builder to configure and create a {@link Kafka09JsonTableSource}.
-	 * @return A builder to configure and create a {@link Kafka09JsonTableSource}.
-	 * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
-	 *             with descriptors for schema and format instead. Descriptors allow for
-	 *             implementation-agnostic definition of tables. See also
-	 *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
-	 */
-	@Deprecated
-	public static Kafka09JsonTableSource.Builder builder() {
-		return new Kafka09JsonTableSource.Builder();
-	}
-
-	/**
-	 * A builder to configure and create a {@link Kafka09JsonTableSource}.
-	 *
-	 * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
-	 *             with descriptors for schema and format instead. Descriptors allow for
-	 *             implementation-agnostic definition of tables. See also
-	 *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
-	 */
-	@Deprecated
-	public static class Builder extends KafkaJsonTableSource.Builder<Kafka09JsonTableSource, Kafka09JsonTableSource.Builder> {
-
-		@Override
-		protected boolean supportsKafkaTimestamps() {
-			return false;
-		}
-
-		@Override
-		protected Kafka09JsonTableSource.Builder builder() {
-			return this;
-		}
-
-		/**
-		 * Builds and configures a {@link Kafka09JsonTableSource}.
-		 *
-		 * @return A configured {@link Kafka09JsonTableSource}.
-		 * @deprecated Use table descriptors instead of implementation-specific builders.
-		 */
-		@Deprecated
-		@Override
-		public Kafka09JsonTableSource build() {
-			Kafka09JsonTableSource tableSource = new Kafka09JsonTableSource(
-				getTopic(),
-				getKafkaProps(),
-				getTableSchema(),
-				getJsonSchema());
-			super.configureTableSource(tableSource);
-			return tableSource;
-		}
-	}
-}
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java
deleted file mode 100644
index 5f5e80d3347..00000000000
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.formats.avro.AvroRowDeserializationSchema;
-import org.apache.flink.types.Row;
-
-/**
- * Tests for the {@link Kafka09AvroTableSource}.
- *
- * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
- *             drop support for format-specific table sources.
- */
-@Deprecated
-public class Kafka09AvroTableSourceTest extends KafkaAvroTableSourceTestBase {
-
-	@Override
-	protected KafkaTableSourceBase.Builder getBuilder() {
-		return Kafka09AvroTableSource.builder();
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
-		return (Class) AvroRowDeserializationSchema.class;
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
-		return (Class) FlinkKafkaConsumer09.class;
-	}
-}
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
deleted file mode 100644
index 29cfa93ef2e..00000000000
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.formats.json.JsonRowSerializationSchema;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.types.Row;
-
-import java.util.Properties;
-
-/**
- * Tests for the {@link Kafka09JsonTableSink}.
- *
- * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
- *             drop support for format-specific table sinks.
- */
-@Deprecated
-public class Kafka09JsonTableSinkTest extends KafkaTableSinkBaseTestBase {
-
-	@Override
-	protected KafkaTableSinkBase createTableSink(
-			String topic,
-			Properties properties,
-			FlinkKafkaPartitioner<Row> partitioner) {
-
-		return new Kafka09JsonTableSink(
-			topic,
-			properties,
-			partitioner);
-	}
-
-	@Override
-	protected Class<? extends SerializationSchema<Row>> getSerializationSchemaClass() {
-		return JsonRowSerializationSchema.class;
-	}
-
-	@Override
-	protected Class<? extends FlinkKafkaProducerBase> getProducerClass() {
-		return FlinkKafkaProducer09.class;
-	}
-}
-
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactoryTest.java
deleted file mode 100644
index e0437a10c4e..00000000000
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactoryTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_09;
-
-/**
- * Factory for creating configured instances of {@link Kafka09JsonTableSource}.
- *
- * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
- *             drop support for format-specific table sources.
- */
-@Deprecated
-public class Kafka09JsonTableSourceFactoryTest extends KafkaJsonTableSourceFactoryTestBase {
-
-	@Override
-	protected String version() {
-		return CONNECTOR_VERSION_VALUE_09;
-	}
-
-	@Override
-	protected KafkaJsonTableSource.Builder builder() {
-		return Kafka09JsonTableSource.builder();
-	}
-}
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
deleted file mode 100644
index 1a630b1007c..00000000000
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.formats.json.JsonRowDeserializationSchema;
-import org.apache.flink.types.Row;
-
-/**
- * Tests for the {@link Kafka09JsonTableSource}.
- *
- * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
- *             drop support for format-specific table sources.
- */
-@Deprecated
-public class Kafka09JsonTableSourceTest extends KafkaJsonTableSourceTestBase {
-
-	@Override
-	protected KafkaTableSourceBase.Builder getBuilder() {
-		return Kafka09JsonTableSource.builder();
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
-		return (Class) JsonRowDeserializationSchema.class;
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
-		return (Class) FlinkKafkaConsumer09.class;
-	}
-}
diff --git a/flink-connectors/flink-connector-kafka-base/pom.xml b/flink-connectors/flink-connector-kafka-base/pom.xml
index 78ebfa6116c..f659e96ce42 100644
--- a/flink-connectors/flink-connector-kafka-base/pom.xml
+++ b/flink-connectors/flink-connector-kafka-base/pom.xml
@@ -44,12 +44,6 @@ under the License.
 
 		<!-- core dependencies -->
 
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-shaded-jackson</artifactId>
-			<scope>provided</scope>
-		</dependency>
-
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
@@ -66,22 +60,6 @@ under the License.
 			<optional>true</optional>
 		</dependency>
 
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-avro</artifactId>
-			<version>${project.version}</version>
-			<!-- Projects depending on this project, won't depend on flink-avro. -->
-			<optional>true</optional>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-json</artifactId>
-			<version>${project.version}</version>
-			<!-- Projects depending on this project, won't depend on flink-json. -->
-			<optional>true</optional>
-		</dependency>
-
 		<dependency>
 			<groupId>org.apache.kafka</groupId>
 			<artifactId>kafka-clients</artifactId>
@@ -189,22 +167,6 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-avro</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-json</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
-
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-table_${scala.binary.version}</artifactId>
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
deleted file mode 100644
index 1e9568fd0f9..00000000000
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.formats.avro.AvroRowDeserializationSchema;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.ConnectorDescriptor;
-import org.apache.flink.table.sources.StreamTableSource;
-
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
-
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * A version-agnostic Kafka Avro {@link StreamTableSource}.
- *
- * <p>The version-specific Kafka consumers need to extend this class and
- * override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}.
- *
- * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
- *             with descriptors for schema and format instead. Descriptors allow for
- *             implementation-agnostic definition of tables. See also
- *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
- */
-@Deprecated
-@Internal
-public abstract class KafkaAvroTableSource extends KafkaTableSourceBase {
-
-	/**
-	 * Creates a generic Kafka Avro {@link StreamTableSource} using a given {@link SpecificRecord}.
-	 *
-	 * @param topic            Kafka topic to consume.
-	 * @param properties       Properties for the Kafka consumer.
-	 * @param schema           Schema of the produced table.
-	 * @param avroRecordClass  Class of the Avro record that is read from the Kafka topic.
-	 * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
-	 *             with descriptors for schema and format instead. Descriptors allow for
-	 *             implementation-agnostic definition of tables. See also
-	 *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
-	 */
-	@Deprecated
-	protected KafkaAvroTableSource(
-		String topic,
-		Properties properties,
-		TableSchema schema,
-		Class<? extends SpecificRecordBase> avroRecordClass) {
-
-		super(
-			schema,
-			topic,
-			properties,
-			new AvroRowDeserializationSchema(avroRecordClass));
-	}
-
-	@Override
-	public String explainSource() {
-		return "KafkaAvroTableSource";
-	}
-
-	//////// HELPER METHODS
-
-	/**
-	 * Abstract builder for a {@link KafkaAvroTableSource} to be extended by builders of subclasses of
-	 * KafkaAvroTableSource.
-	 *
-	 * @param <T> Type of the KafkaAvroTableSource produced by the builder.
-	 * @param <B> Type of the KafkaAvroTableSource.Builder subclass.
-	 *
-	 * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
-	 *             with descriptors for schema and format instead. Descriptors allow for
-	 *             implementation-agnostic definition of tables. See also
-	 *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
-	 */
-	@Deprecated
-	protected abstract static class Builder<T extends KafkaAvroTableSource, B extends KafkaAvroTableSource.Builder>
-		extends KafkaTableSourceBase.Builder<T, B> {
-
-		private Class<? extends SpecificRecordBase> avroClass;
-
-		private Map<String, String> fieldMapping;
-
-		/**
-		 * Sets the class of the Avro records that are read from the Kafka topic.
-		 *
-		 * @param avroClass The class of the Avro records that are read from the Kafka topic.
-		 * @return The builder.
-		 * @deprecated Use table descriptors instead of implementation-specific builders.
-		 */
-		@Deprecated
-		public B forAvroRecordClass(Class<? extends SpecificRecordBase> avroClass) {
-			this.avroClass = avroClass;
-			return builder();
-		}
-
-		/**
-		 * Sets a mapping from schema fields to fields of the produced Avro record.
-		 *
-		 * <p>A field mapping is required if the fields of produced tables should be named different than
-		 * the fields of the Avro record.
-		 * The key of the provided Map refers to the field of the table schema,
-		 * the value to the field of the Avro record.</p>
-		 *
-		 * @param schemaToAvroMapping A mapping from schema fields to Avro fields.
-		 * @return The builder.
-		 * @deprecated Use table descriptors instead of implementation-specific builders.
-		 */
-		@Deprecated
-		public B withTableToAvroMapping(Map<String, String> schemaToAvroMapping) {
-			this.fieldMapping = schemaToAvroMapping;
-			return builder();
-		}
-
-		/**
-		 * Returns the configured Avro class.
-		 *
-		 * @return The configured Avro class.
-		 * @deprecated Use table descriptors instead of implementation-specific builders.
-		 */
-		@Deprecated
-		protected Class<? extends SpecificRecordBase> getAvroRecordClass() {
-			return this.avroClass;
-		}
-
-		@Override
-		protected void configureTableSource(T source) {
-			super.configureTableSource(source);
-			source.setFieldMapping(this.fieldMapping);
-		}
-	}
-}
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
deleted file mode 100644
index d84eb891044..00000000000
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.formats.json.JsonRowSerializationSchema;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.types.Row;
-
-import java.util.Properties;
-
-/**
- * Base class for {@link KafkaTableSinkBase} that serializes data in JSON format.
- *
- * @deprecated Use table descriptors instead of implementation-specific classes.
- */
-@Deprecated
-@Internal
-public abstract class KafkaJsonTableSink extends KafkaTableSinkBase {
-
-	/**
-	 * Creates KafkaJsonTableSink.
-	 *
-	 * @param topic topic in Kafka to which table is written
-	 * @param properties properties to connect to Kafka
-	 * @param partitioner Kafka partitioner
-	 * @deprecated Use table descriptors instead of implementation-specific classes.
-	 */
-	@Deprecated
-	public KafkaJsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
-		super(topic, properties, partitioner);
-	}
-
-	@Override
-	protected SerializationSchema<Row> createSerializationSchema(RowTypeInfo rowSchema) {
-		return new JsonRowSerializationSchema(rowSchema);
-	}
-}
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
deleted file mode 100644
index a9db97938d6..00000000000
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.formats.json.JsonRowDeserializationSchema;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.ConnectorDescriptor;
-import org.apache.flink.table.sources.StreamTableSource;
-
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * A version-agnostic Kafka JSON {@link StreamTableSource}.
- *
- * <p>The version-specific Kafka consumers need to extend this class and
- * override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}.
- *
- * <p>The field names are used to parse the JSON file and so are the types.
- *
- * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
- *             with descriptors for schema and format instead. Descriptors allow for
- *             implementation-agnostic definition of tables. See also
- *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
- */
-@Deprecated
-@Internal
-public abstract class KafkaJsonTableSource extends KafkaTableSourceBase {
-
-	/**
-	 * Creates a generic Kafka JSON {@link StreamTableSource}.
-	 *
-	 * @param topic       Kafka topic to consume.
-	 * @param properties  Properties for the Kafka consumer.
-	 * @param tableSchema The schema of the table.
-	 * @param jsonSchema  The schema of the JSON messages to decode from Kafka.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	protected KafkaJsonTableSource(
-		String topic,
-		Properties properties,
-		TableSchema tableSchema,
-		TableSchema jsonSchema) {
-
-		super(
-			tableSchema,
-			topic,
-			properties,
-			new JsonRowDeserializationSchema(jsonSchema.toRowType()));
-	}
-
-	@Override
-	public String explainSource() {
-		return "KafkaJsonTableSource";
-	}
-
-	//////// SETTERS FOR OPTIONAL PARAMETERS
-
-	/**
-	 * Sets the flag that specifies the behavior in case of missing fields.
-	 * TableSource will fail for missing fields if set to true. If set to false, the missing field is set to null.
-	 *
-	 * @param failOnMissingField Flag that specifies the TableSource behavior in case of missing fields.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	protected void setFailOnMissingField(boolean failOnMissingField) {
-		((JsonRowDeserializationSchema) getDeserializationSchema()).setFailOnMissingField(failOnMissingField);
-	}
-
-	//////// HELPER METHODS
-
-	/**
-	 * Abstract builder for a {@link KafkaJsonTableSource} to be extended by builders of subclasses of
-	 * KafkaJsonTableSource.
-	 *
-	 * @param <T> Type of the KafkaJsonTableSource produced by the builder.
-	 * @param <B> Type of the KafkaJsonTableSource.Builder subclass.
-	 * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
-	 *             with descriptors for schema and format instead. Descriptors allow for
-	 *             implementation-agnostic definition of tables. See also
-	 *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
-	 */
-	@Deprecated
-	protected abstract static class Builder<T extends KafkaJsonTableSource, B extends KafkaJsonTableSource.Builder>
-		extends KafkaTableSourceBase.Builder<T, B> {
-
-		private TableSchema jsonSchema;
-
-		private Map<String, String> fieldMapping;
-
-		private boolean failOnMissingField = false;
-
-		/**
-		 * Sets the schema of the JSON-encoded Kafka messages.
-		 * If not set, the JSON messages are decoded with the table schema.
-		 *
-		 * @param jsonSchema The schema of the JSON-encoded Kafka messages.
-		 * @return The builder.
-		 * @deprecated Use table descriptors instead of implementation-specific builders.
-		 */
-		@Deprecated
-		public B forJsonSchema(TableSchema jsonSchema) {
-			this.jsonSchema = jsonSchema;
-			return builder();
-		}
-
-		/**
-		 * Sets a mapping from schema fields to fields of the JSON schema.
-		 *
-		 * <p>A field mapping is required if the fields of produced tables should be named different than
-		 * the fields of the JSON records.
-		 * The key of the provided Map refers to the field of the table schema,
-		 * the value to the field in the JSON schema.</p>
-		 *
-		 * @param tableToJsonMapping A mapping from table schema fields to JSON schema fields.
-		 * @return The builder.
-		 * @deprecated Use table descriptors instead of implementation-specific builders.
-		 */
-		@Deprecated
-		public B withTableToJsonMapping(Map<String, String> tableToJsonMapping) {
-			this.fieldMapping = tableToJsonMapping;
-			return builder();
-		}
-
-		/**
-		 * Sets flag whether to fail if a field is missing or not.
-		 *
-		 * @param failOnMissingField If set to true, the TableSource fails if there is a missing
-		 *                           field.
-		 *                           If set to false, a missing field is set to null.
-		 * @return The builder.
-		 * @deprecated Use table descriptors instead of implementation-specific builders.
-		 */
-		@Deprecated
-		public B failOnMissingField(boolean failOnMissingField) {
-			this.failOnMissingField = failOnMissingField;
-			return builder();
-		}
-
-		/**
-		 * Returns the configured JSON schema. If no JSON schema was configured, the table schema
-		 * is returned.
-		 *
-		 * @return The JSON schema for the TableSource.
-		 * @deprecated Use table descriptors instead of implementation-specific builders.
-		 */
-		@Deprecated
-		protected TableSchema getJsonSchema() {
-			if (jsonSchema != null) {
-				return this.jsonSchema;
-			} else {
-				return getTableSchema();
-			}
-		}
-
-		/**
-		 * Configures a TableSource with optional parameters.
-		 *
-		 * @param source The TableSource to configure.
-		 * @deprecated Use table descriptors instead of implementation-specific builders.
-		 */
-		@Deprecated
-		protected void configureTableSource(T source) {
-			super.configureTableSource(source);
-			// configure field mapping
-			source.setFieldMapping(this.fieldMapping);
-			// configure missing field behavior
-			source.setFailOnMissingField(this.failOnMissingField);
-		}
-	}
-}
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java
index 4755cffae69..216aec1c026 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java
@@ -31,7 +31,6 @@
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
-import java.util.Arrays;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
@@ -45,8 +44,6 @@
 @Internal
 public abstract class KafkaTableSinkBase implements AppendStreamTableSink<Row> {
 
-	// TODO make all attributes final and mandatory once we drop support for format-specific table sinks
-
 	/** The schema of the table. */
 	private final Optional<TableSchema> schema;
 
@@ -57,15 +54,11 @@
 	protected final Properties properties;
 
 	/** Serialization schema for encoding records to Kafka. */
-	protected Optional<SerializationSchema<Row>> serializationSchema;
+	protected final Optional<SerializationSchema<Row>> serializationSchema;
 
 	/** Partitioner to select Kafka partition for each item. */
 	protected final Optional<FlinkKafkaPartitioner<Row>> partitioner;
 
-	// legacy variables
-	protected String[] fieldNames;
-	protected TypeInformation[] fieldTypes;
-
 	protected KafkaTableSinkBase(
 			TableSchema schema,
 			String topic,
@@ -80,26 +73,6 @@ protected KafkaTableSinkBase(
 			serializationSchema, "Serialization schema must not be null."));
 	}
 
-	/**
-	 * Creates KafkaTableSinkBase.
-	 *
-	 * @param topic                 Kafka topic to write to.
-	 * @param properties            Properties for the Kafka producer.
-	 * @param partitioner           Partitioner to select Kafka partition for each item
-	 * @deprecated Use table descriptors instead of implementation-specific classes.
-	 */
-	@Deprecated
-	public KafkaTableSinkBase(
-			String topic,
-			Properties properties,
-			FlinkKafkaPartitioner<Row> partitioner) {
-		this.schema = Optional.empty();
-		this.topic = Preconditions.checkNotNull(topic, "topic");
-		this.properties = Preconditions.checkNotNull(properties, "properties");
-		this.partitioner = Optional.of(Preconditions.checkNotNull(partitioner, "partitioner"));
-		this.serializationSchema = Optional.empty();
-	}
-
 	/**
 	 * Returns the version-specific Kafka producer.
 	 *
@@ -115,28 +88,6 @@ public KafkaTableSinkBase(
 		SerializationSchema<Row> serializationSchema,
 		Optional<FlinkKafkaPartitioner<Row>> partitioner);
 
-	/**
-	 * Create serialization schema for converting table rows into bytes.
-	 *
-	 * @param rowSchema the schema of the row to serialize.
-	 * @return Instance of serialization schema
-	 * @deprecated Use the constructor to pass a serialization schema instead.
-	 */
-	@Deprecated
-	protected SerializationSchema<Row> createSerializationSchema(RowTypeInfo rowSchema) {
-		throw new UnsupportedOperationException("This method only exists for backwards compatibility.");
-	}
-
-	/**
-	 * Create a deep copy of this sink.
-	 *
-	 * @return Deep copy of this sink
-	 */
-	@Deprecated
-	protected KafkaTableSinkBase createCopy() {
-		throw new UnsupportedOperationException("This method only exists for backwards compatibility.");
-	}
-
 	@Override
 	public void emitDataStream(DataStream<Row> dataStream) {
 		SinkFunction<Row> kafkaProducer = createKafkaProducer(
@@ -144,7 +95,7 @@ public void emitDataStream(DataStream<Row> dataStream) {
 			properties,
 			serializationSchema.orElseThrow(() -> new IllegalStateException("No serialization schema defined.")),
 			partitioner);
-		dataStream.addSink(kafkaProducer).name(TableConnectorUtil.generateRuntimeName(this.getClass(), fieldNames));
+		dataStream.addSink(kafkaProducer).name(TableConnectorUtil.generateRuntimeName(this.getClass(), getFieldNames()));
 	}
 
 	@Override
@@ -155,12 +106,12 @@ public void emitDataStream(DataStream<Row> dataStream) {
 	}
 
 	public String[] getFieldNames() {
-		return schema.map(TableSchema::getFieldNames).orElse(fieldNames);
+		return schema.map(TableSchema::getFieldNames).get();
 	}
 
 	@Override
 	public TypeInformation<?>[] getFieldTypes() {
-		return schema.map(TableSchema::getFieldTypes).orElse(fieldTypes);
+		return schema.map(TableSchema::getFieldTypes).get();
 	}
 
 	@Override
@@ -170,17 +121,7 @@ public KafkaTableSinkBase configure(String[] fieldNames, TypeInformation<?>[] fi
 			throw new UnsupportedOperationException("Reconfiguration of this sink is not supported.");
 		}
 
-		// legacy code
-		KafkaTableSinkBase copy = createCopy();
-		copy.fieldNames = Preconditions.checkNotNull(fieldNames, "fieldNames");
-		copy.fieldTypes = Preconditions.checkNotNull(fieldTypes, "fieldTypes");
-		Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
-			"Number of provided field names and types does not match.");
-
-		RowTypeInfo rowSchema = new RowTypeInfo(fieldTypes, fieldNames);
-		copy.serializationSchema = Optional.of(createSerializationSchema(rowSchema));
-
-		return copy;
+		throw new UnsupportedOperationException("This method only exists for backwards compatibility.");
 	}
 
 	@Override
@@ -196,9 +137,7 @@ public boolean equals(Object o) {
 			Objects.equals(topic, that.topic) &&
 			Objects.equals(properties, that.properties) &&
 			Objects.equals(serializationSchema, that.serializationSchema) &&
-			Objects.equals(partitioner, that.partitioner) &&
-			Arrays.equals(fieldNames, that.fieldNames) &&
-			Arrays.equals(fieldTypes, that.fieldTypes);
+			Objects.equals(partitioner, that.partitioner);
 	}
 
 	@Override
@@ -209,8 +148,6 @@ public int hashCode() {
 			properties,
 			serializationSchema,
 			partitioner);
-		result = 31 * result + Arrays.hashCode(fieldNames);
-		result = 31 * result + Arrays.hashCode(fieldTypes);
 		return result;
 	}
 }
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceBase.java
index 7078fc90ce4..8f37eb820fa 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceBase.java
@@ -28,15 +28,11 @@
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.Types;
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.table.sources.DefinedFieldMapping;
 import org.apache.flink.table.sources.DefinedProctimeAttribute;
 import org.apache.flink.table.sources.DefinedRowtimeAttributes;
 import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.StreamTableSource;
-import org.apache.flink.table.sources.tsextractors.StreamRecordTimestamp;
-import org.apache.flink.table.sources.tsextractors.TimestampExtractor;
-import org.apache.flink.table.sources.wmstrategies.WatermarkStrategy;
 import org.apache.flink.table.util.TableConnectorUtil;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
@@ -68,13 +64,13 @@
 	private final TableSchema schema;
 
 	/** Field name of the processing time attribute, null if no processing time field is defined. */
-	private Optional<String> proctimeAttribute;
+	private final Optional<String> proctimeAttribute;
 
 	/** Descriptor for a rowtime attribute. */
-	private List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors;
+	private final List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors;
 
 	/** Mapping for the fields of the table schema to fields of the physical returned type. */
-	private Optional<Map<String, String>> fieldMapping;
+	private final Optional<Map<String, String>> fieldMapping;
 
 	// Kafka-specific attributes
 
@@ -88,10 +84,10 @@
 	private final DeserializationSchema<Row> deserializationSchema;
 
 	/** The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}). */
-	private StartupMode startupMode;
+	private final StartupMode startupMode;
 
 	/** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */
-	private Map<KafkaTopicPartition, Long> specificStartupOffsets;
+	private final Map<KafkaTopicPartition, Long> specificStartupOffsets;
 
 	/**
 	 * Creates a generic Kafka {@link StreamTableSource}.
@@ -323,63 +319,6 @@ public int hashCode() {
 		return rowtimeAttributeDescriptors;
 	}
 
-	//////// SETTERS FOR OPTIONAL PARAMETERS
-
-	/**
-	 * Declares a field of the schema to be the processing time attribute.
-	 *
-	 * @param proctimeAttribute The name of the field that becomes the processing time field.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	protected void setProctimeAttribute(String proctimeAttribute) {
-		this.proctimeAttribute = validateProctimeAttribute(Optional.ofNullable(proctimeAttribute));
-	}
-
-	/**
-	 * Declares a list of fields to be rowtime attributes.
-	 *
-	 * @param rowtimeAttributeDescriptors The descriptors of the rowtime attributes.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	protected void setRowtimeAttributeDescriptors(List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors) {
-		this.rowtimeAttributeDescriptors = validateRowtimeAttributeDescriptors(rowtimeAttributeDescriptors);
-	}
-
-	/**
-	 * Sets the startup mode of the TableSource.
-	 *
-	 * @param startupMode The startup mode.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	protected void setStartupMode(StartupMode startupMode) {
-		this.startupMode = Preconditions.checkNotNull(startupMode);
-	}
-
-	/**
-	 * Sets the startup offsets of the TableSource; only relevant when the startup mode is {@link StartupMode#SPECIFIC_OFFSETS}.
-	 *
-	 * @param specificStartupOffsets The startup offsets for different partitions.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	protected void setSpecificStartupOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets) {
-		this.specificStartupOffsets = Preconditions.checkNotNull(specificStartupOffsets);
-	}
-
-	/**
-	 * Mapping for the fields of the table schema to fields of the physical returned type.
-	 *
-	 * @param fieldMapping The mapping from table schema fields to format schema fields.
-	 * @deprecated Use table descriptors instead of implementation-specific builders.
-	 */
-	@Deprecated
-	protected void setFieldMapping(Map<String, String> fieldMapping) {
-		this.fieldMapping = Optional.ofNullable(fieldMapping);
-	}
-
 	//////// ABSTRACT METHODS FOR SUBCLASSES
 
 	/**
@@ -395,296 +334,4 @@ protected void setFieldMapping(Map<String, String> fieldMapping) {
 			Properties properties,
 			DeserializationSchema<Row> deserializationSchema);
 
-	/**
-	 * Abstract builder for a {@link KafkaTableSourceBase} to be extended by builders of subclasses of
-	 * KafkaTableSourceBase.
-	 *
-	 * @param <T> Type of the KafkaTableSourceBase produced by the builder.
-	 * @param <B> Type of the KafkaTableSourceBase.Builder subclass.
-	 * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
-	 *             with descriptors for schema and format instead. Descriptors allow for
-	 *             implementation-agnostic definition of tables. See also
-	 *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
-	 */
-	@Deprecated
-	protected abstract static class Builder<T extends KafkaTableSourceBase, B extends KafkaTableSourceBase.Builder> {
-
-		private String topic;
-
-		private Properties kafkaProps;
-
-		private TableSchema schema;
-
-		private String proctimeAttribute;
-
-		private RowtimeAttributeDescriptor rowtimeAttributeDescriptor;
-
-		/** The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}). */
-		private StartupMode startupMode = StartupMode.GROUP_OFFSETS;
-
-		/** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */
-		private Map<KafkaTopicPartition, Long> specificStartupOffsets = null;
-
-		/**
-		 * Sets the topic from which the table is read.
-		 *
-		 * @param topic The topic from which the table is read.
-		 * @return The builder.
-		 * @deprecated Use table descriptors instead of implementation-specific builders.
-		 */
-		@Deprecated
-		public B forTopic(String topic) {
-			Preconditions.checkNotNull(topic, "Topic must not be null.");
-			Preconditions.checkArgument(this.topic == null, "Topic has already been set.");
-			this.topic = topic;
-			return builder();
-		}
-
-		/**
-		 * Sets the configuration properties for the Kafka consumer.
-		 *
-		 * @param props The configuration properties for the Kafka consumer.
-		 * @return The builder.
-		 * @deprecated Use table descriptors instead of implementation-specific builders.
-		 */
-		@Deprecated
-		public B withKafkaProperties(Properties props) {
-			Preconditions.checkNotNull(props, "Properties must not be null.");
-			Preconditions.checkArgument(this.kafkaProps == null, "Properties have already been set.");
-			this.kafkaProps = props;
-			return builder();
-		}
-
-		/**
-		 * Sets the schema of the produced table.
-		 *
-		 * @param schema The schema of the produced table.
-		 * @return The builder.
-		 * @deprecated Use table descriptors instead of implementation-specific builders.
-		 */
-		@Deprecated
-		public B withSchema(TableSchema schema) {
-			Preconditions.checkNotNull(schema, "Schema must not be null.");
-			Preconditions.checkArgument(this.schema == null, "Schema has already been set.");
-			this.schema = schema;
-			return builder();
-		}
-
-		/**
-		 * Configures a field of the table to be a processing time attribute.
-		 * The configured field must be present in the table schema and of type {@link Types#SQL_TIMESTAMP()}.
-		 *
-		 * @param proctimeAttribute The name of the processing time attribute in the table schema.
-		 * @return The builder.
-		 * @deprecated Use table descriptors instead of implementation-specific builders.
-		 */
-		@Deprecated
-		public B withProctimeAttribute(String proctimeAttribute) {
-			Preconditions.checkNotNull(proctimeAttribute, "Proctime attribute must not be null.");
-			Preconditions.checkArgument(!proctimeAttribute.isEmpty(), "Proctime attribute must not be empty.");
-			Preconditions.checkArgument(this.proctimeAttribute == null, "Proctime attribute has already been set.");
-			this.proctimeAttribute = proctimeAttribute;
-			return builder();
-		}
-
-		/**
-		 * Configures a field of the table to be a rowtime attribute.
-		 * The configured field must be present in the table schema and of type {@link Types#SQL_TIMESTAMP()}.
-		 *
-		 * @param rowtimeAttribute The name of the rowtime attribute in the table schema.
-		 * @param timestampExtractor The {@link TimestampExtractor} to extract the rowtime attribute from the physical type.
-		 * @param watermarkStrategy The {@link WatermarkStrategy} to generate watermarks for the rowtime attribute.
-		 * @return The builder.
-		 * @deprecated Use table descriptors instead of implementation-specific builders.
-		 */
-		@Deprecated
-		public B withRowtimeAttribute(
-				String rowtimeAttribute,
-				TimestampExtractor timestampExtractor,
-				WatermarkStrategy watermarkStrategy) {
-			Preconditions.checkNotNull(rowtimeAttribute, "Rowtime attribute must not be null.");
-			Preconditions.checkArgument(!rowtimeAttribute.isEmpty(), "Rowtime attribute must not be empty.");
-			Preconditions.checkNotNull(timestampExtractor, "Timestamp extractor must not be null.");
-			Preconditions.checkNotNull(watermarkStrategy, "Watermark assigner must not be null.");
-			Preconditions.checkArgument(this.rowtimeAttributeDescriptor == null,
-				"Currently, only one rowtime attribute is supported.");
-
-			this.rowtimeAttributeDescriptor = new RowtimeAttributeDescriptor(
-				rowtimeAttribute,
-				timestampExtractor,
-				watermarkStrategy);
-			return builder();
-		}
-
-		/**
-		 * Configures the Kafka timestamp to be a rowtime attribute.
-		 *
-		 * <p>Note: Kafka supports message timestamps only since version 0.10.</p>
-		 *
-		 * @param rowtimeAttribute The name of the rowtime attribute in the table schema.
-		 * @param watermarkStrategy The {@link WatermarkStrategy} to generate watermarks for the rowtime attribute.
-		 * @return The builder.
-		 * @deprecated Use table descriptors instead of implementation-specific builders.
-		 */
-		@Deprecated
-		public B withKafkaTimestampAsRowtimeAttribute(
-				String rowtimeAttribute,
-				WatermarkStrategy watermarkStrategy) {
-
-			Preconditions.checkNotNull(rowtimeAttribute, "Rowtime attribute must not be null.");
-			Preconditions.checkArgument(!rowtimeAttribute.isEmpty(), "Rowtime attribute must not be empty.");
-			Preconditions.checkNotNull(watermarkStrategy, "Watermark assigner must not be null.");
-			Preconditions.checkArgument(this.rowtimeAttributeDescriptor == null,
-				"Currently, only one rowtime attribute is supported.");
-			Preconditions.checkArgument(supportsKafkaTimestamps(), "Kafka timestamps are only supported since Kafka 0.10.");
-
-			this.rowtimeAttributeDescriptor = new RowtimeAttributeDescriptor(
-				rowtimeAttribute,
-				new StreamRecordTimestamp(),
-				watermarkStrategy);
-			return builder();
-		}
-
-		/**
-		 * Configures the TableSource to start reading from the earliest offset for all partitions.
-		 *
-		 * @see FlinkKafkaConsumerBase#setStartFromEarliest()
-		 * @deprecated Use table descriptors instead of implementation-specific builders.
-		 */
-		@Deprecated
-		public B fromEarliest() {
-			this.startupMode = StartupMode.EARLIEST;
-			this.specificStartupOffsets = null;
-			return builder();
-		}
-
-		/**
-		 * Configures the TableSource to start reading from the latest offset for all partitions.
-		 *
-		 * @see FlinkKafkaConsumerBase#setStartFromLatest()
-		 * @deprecated Use table descriptors instead of implementation-specific builders.
-		 */
-		@Deprecated
-		public B fromLatest() {
-			this.startupMode = StartupMode.LATEST;
-			this.specificStartupOffsets = null;
-			return builder();
-		}
-
-		/**
-		 * Configures the TableSource to start reading from any committed group offsets found in Zookeeper / Kafka brokers.
-		 *
-		 * @see FlinkKafkaConsumerBase#setStartFromGroupOffsets()
-		 * @deprecated Use table descriptors instead of implementation-specific builders.
-		 */
-		@Deprecated
-		public B fromGroupOffsets() {
-			this.startupMode = StartupMode.GROUP_OFFSETS;
-			this.specificStartupOffsets = null;
-			return builder();
-		}
-
-		/**
-		 * Configures the TableSource to start reading partitions from specific offsets, set independently for each partition.
-		 *
-		 * @param specificStartupOffsets the specified offsets for partitions
-		 * @see FlinkKafkaConsumerBase#setStartFromSpecificOffsets(Map)
-		 * @deprecated Use table descriptors instead of implementation-specific builders.
-		 */
-		@Deprecated
-		public B fromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets) {
-			this.startupMode = StartupMode.SPECIFIC_OFFSETS;
-			this.specificStartupOffsets = Preconditions.checkNotNull(specificStartupOffsets);
-			return builder();
-		}
-
-		/**
-		 * Returns the configured topic.
-		 *
-		 * @return the configured topic.
-		 * @deprecated Use table descriptors instead of implementation-specific builders.
-		 */
-		@Deprecated
-		protected String getTopic() {
-			return this.topic;
-		}
-
-		/**
-		 * Returns the configured Kafka properties.
-		 *
-		 * @return the configured Kafka properties.
-		 * @deprecated Use table descriptors instead of implementation-specific builders.
-		 */
-		@Deprecated
-		protected Properties getKafkaProps() {
-			return this.kafkaProps;
-		}
-
-		/**
-		 * Returns the configured table schema.
-		 *
-		 * @return the configured table schema.
-		 * @deprecated Use table descriptors instead of implementation-specific builders.
-		 */
-		@Deprecated
-		protected TableSchema getTableSchema() {
-			return this.schema;
-		}
-
-		/**
-		 * True if the KafkaSource supports Kafka timestamps, false otherwise.
-		 *
-		 * @return True if the KafkaSource supports Kafka timestamps, false otherwise.
-		 * @deprecated Use table descriptors instead of implementation-specific builders.
-		 */
-		@Deprecated
-		protected abstract boolean supportsKafkaTimestamps();
-
-		/**
-		 * Configures a TableSource with optional parameters.
-		 *
-		 * @param tableSource The TableSource to configure.
-		 * @deprecated Use table descriptors instead of implementation-specific builders.
-		 */
-		@Deprecated
-		protected void configureTableSource(T tableSource) {
-			// configure processing time attributes
-			tableSource.setProctimeAttribute(proctimeAttribute);
-			// configure rowtime attributes
-			if (rowtimeAttributeDescriptor == null) {
-				tableSource.setRowtimeAttributeDescriptors(Collections.emptyList());
-			} else {
-				tableSource.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
-			}
-			tableSource.setStartupMode(startupMode);
-			switch (startupMode) {
-				case EARLIEST:
-				case LATEST:
-				case GROUP_OFFSETS:
-					break;
-				case SPECIFIC_OFFSETS:
-					tableSource.setSpecificStartupOffsets(specificStartupOffsets);
-					break;
-			}
-		}
-
-		/**
-		 * Returns the builder.
-		 * @return the builder.
-		 * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
-		 *             with descriptors for schema and format instead. Descriptors allow for
-		 *             implementation-agnostic definition of tables. See also
-		 *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
-		 */
-		@Deprecated
-		protected abstract B builder();
-
-		/**
-		 * Builds the configured {@link KafkaTableSourceBase}.
-		 * @return The configured {@link KafkaTableSourceBase}.
-		 * @deprecated Use table descriptors instead of implementation-specific builders.
-		 */
-		@Deprecated
-		protected abstract KafkaTableSourceBase build();
-	}
 }
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
deleted file mode 100644
index 9d55473315b..00000000000
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.formats.json.JsonNodeDeserializationSchema;
-
-/**
- * DeserializationSchema that deserializes a JSON String into an ObjectNode.
- *
- * <p>Fields can be accessed by calling objectNode.get(&lt;name>).as(&lt;type>)
- *
- * @deprecated Please use {@link JsonNodeDeserializationSchema} in the "flink-json" module.
- */
-@PublicEvolving
-@Deprecated
-public class JSONDeserializationSchema extends JsonNodeDeserializationSchema {
-	// delegate everything to the parent class
-}
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
deleted file mode 100644
index a09f10309de..00000000000
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.types.Row;
-
-/**
- * Deserialization schema from JSON to {@link Row}.
- *
- * <p>Deserializes the <code>byte[]</code> messages as a JSON object and reads
- * the specified fields.
- *
- * <p>Failure during deserialization are forwarded as wrapped IOExceptions.
- *
- * @deprecated Please use {@link org.apache.flink.formats.json.JsonRowDeserializationSchema} in
- * the "flink-json" module.
- */
-@PublicEvolving
-@Deprecated
-public class JsonRowDeserializationSchema extends org.apache.flink.formats.json.JsonRowDeserializationSchema {
-
-	/**
-	 * Creates a JSON deserialization schema for the given fields and types.
-	 *
-	 * @param typeInfo   Type information describing the result type. The field names are used
-	 *                   to parse the JSON file and so are the types.
-	 *
-	 * @deprecated Please use {@link org.apache.flink.formats.json.JsonRowDeserializationSchema} in
-	 * the "flink-json" module.
-	 */
-	@Deprecated
-	public JsonRowDeserializationSchema(TypeInformation<Row> typeInfo) {
-		super(typeInfo);
-	}
-}
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
deleted file mode 100644
index 83f7519db1b..00000000000
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.types.Row;
-
-/**
- * Serialization schema that serializes an object into a JSON bytes.
- *
- * <p>Serializes the input {@link Row} object into a JSON string and
- * converts it into <code>byte[]</code>.
- *
- * <p>Result <code>byte[]</code> messages can be deserialized using
- * {@link JsonRowDeserializationSchema}.
- *
- * @deprecated Please use {@link org.apache.flink.formats.json.JsonRowSerializationSchema} in
- * the "flink-json" module.
- */
-@PublicEvolving
-@Deprecated
-public class JsonRowSerializationSchema extends org.apache.flink.formats.json.JsonRowSerializationSchema {
-
-	/**
-	 * Creates a JSON serialization schema for the given fields and types.
-	 *
-	 * @param typeInfo The schema of the rows to encode.
-	 *
-	 * @deprecated Please use {@link org.apache.flink.formats.json.JsonRowSerializationSchema} in
-	 * the "flink-json" module.
-	 */
-	@Deprecated
-	public JsonRowSerializationSchema(TypeInformation<Row> typeInfo) {
-		super(typeInfo);
-	}
-}
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
deleted file mode 100644
index 089a1182518..00000000000
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.formats.avro.generated.DifferentSchemaRecord;
-import org.apache.flink.formats.avro.generated.SchemaRecord;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.Types;
-
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-
-/**
- * Abstract test base for all Kafka Avro table sources.
- *
- * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
- *             drop support for format-specific table sources.
- */
-@Deprecated
-public abstract class KafkaAvroTableSourceTestBase extends KafkaTableSourceBuilderTestBase {
-
-	@Override
-	protected void configureBuilder(KafkaTableSourceBase.Builder builder) {
-		super.configureBuilder(builder);
-		((KafkaAvroTableSource.Builder) builder).forAvroRecordClass(SchemaRecord.class);
-	}
-
-	@Test
-	public void testSameFieldsAvroClass() {
-		KafkaAvroTableSource.Builder b = (KafkaAvroTableSource.Builder) getBuilder();
-		this.configureBuilder(b);
-
-		KafkaAvroTableSource source = (KafkaAvroTableSource) b.build();
-
-		// check return type
-		RowTypeInfo returnType = (RowTypeInfo) source.getReturnType();
-		assertNotNull(returnType);
-		assertEquals(5, returnType.getArity());
-		// check field names
-		assertEquals("field1", returnType.getFieldNames()[0]);
-		assertEquals("field2", returnType.getFieldNames()[1]);
-		assertEquals("time1", returnType.getFieldNames()[2]);
-		assertEquals("time2", returnType.getFieldNames()[3]);
-		assertEquals("field3", returnType.getFieldNames()[4]);
-		// check field types
-		assertEquals(Types.LONG(), returnType.getTypeAt(0));
-		assertEquals(Types.STRING(), returnType.getTypeAt(1));
-		assertEquals(Types.LONG(), returnType.getTypeAt(2));
-		assertEquals(Types.LONG(), returnType.getTypeAt(3));
-		assertEquals(Types.DOUBLE(), returnType.getTypeAt(4));
-
-		// check field mapping
-		assertNull(source.getFieldMapping());
-
-		// check if DataStream type matches with TableSource.getReturnType()
-		assertEquals(source.getReturnType(),
-			source.getDataStream(StreamExecutionEnvironment.getExecutionEnvironment()).getType());
-	}
-
-	@Test
-	public void testDifferentFieldsAvroClass() {
-		KafkaAvroTableSource.Builder b = (KafkaAvroTableSource.Builder) getBuilder();
-		super.configureBuilder(b);
-		b.withProctimeAttribute("time2");
-
-		Map<String, String> mapping = new HashMap<>();
-		mapping.put("field1", "otherField1");
-		mapping.put("field2", "otherField2");
-		mapping.put("field3", "otherField3");
-
-		// set Avro class with different fields
-		b.forAvroRecordClass(DifferentSchemaRecord.class);
-		b.withTableToAvroMapping(mapping);
-
-		KafkaAvroTableSource source = (KafkaAvroTableSource) b.build();
-
-		// check return type
-		RowTypeInfo returnType = (RowTypeInfo) source.getReturnType();
-		assertNotNull(returnType);
-		assertEquals(6, returnType.getArity());
-		// check field names
-		assertEquals("otherField1", returnType.getFieldNames()[0]);
-		assertEquals("otherField2", returnType.getFieldNames()[1]);
-		assertEquals("otherTime1", returnType.getFieldNames()[2]);
-		assertEquals("otherField3", returnType.getFieldNames()[3]);
-		assertEquals("otherField4", returnType.getFieldNames()[4]);
-		assertEquals("otherField5", returnType.getFieldNames()[5]);
-		// check field types
-		assertEquals(Types.LONG(), returnType.getTypeAt(0));
-		assertEquals(Types.STRING(), returnType.getTypeAt(1));
-		assertEquals(Types.LONG(), returnType.getTypeAt(2));
-		assertEquals(Types.DOUBLE(), returnType.getTypeAt(3));
-		assertEquals(Types.FLOAT(), returnType.getTypeAt(4));
-		assertEquals(Types.INT(), returnType.getTypeAt(5));
-
-		// check field mapping
-		Map<String, String> fieldMapping = source.getFieldMapping();
-		assertNotNull(fieldMapping);
-		assertEquals(3, fieldMapping.size());
-		assertEquals("otherField1", fieldMapping.get("field1"));
-		assertEquals("otherField2", fieldMapping.get("field2"));
-		assertEquals("otherField3", fieldMapping.get("field3"));
-
-		// check if DataStream type matches with TableSource.getReturnType()
-		assertEquals(source.getReturnType(),
-			source.getDataStream(StreamExecutionEnvironment.getExecutionEnvironment()).getType());
-	}
-}
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
deleted file mode 100644
index 28f45204de8..00000000000
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.formats.json.JsonRowSchemaConverter;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.FormatDescriptor;
-import org.apache.flink.table.descriptors.Json;
-import org.apache.flink.table.descriptors.Kafka;
-import org.apache.flink.table.descriptors.Rowtime;
-import org.apache.flink.table.descriptors.Schema;
-import org.apache.flink.table.descriptors.TestTableDescriptor;
-import org.apache.flink.table.factories.StreamTableSourceFactory;
-import org.apache.flink.table.factories.TableFactoryService;
-import org.apache.flink.table.sources.TableSource;
-import org.apache.flink.table.sources.TableSourceUtil;
-import org.apache.flink.table.sources.tsextractors.ExistingField;
-import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
-
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests for legacy KafkaJsonTableSourceFactory.
- *
- * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
- *             drop support for format-specific table sources.
- */
-@Deprecated
-public abstract class KafkaJsonTableSourceFactoryTestBase {
-
-	private static final String JSON_SCHEMA =
-		"{" +
-		"  'title': 'Fruit'," +
-		"  'type': 'object'," +
-		"  'properties': {" +
-		"    'name': {" +
-		"      'type': 'string'" +
-		"    }," +
-		"    'count': {" +
-		"      'type': 'integer'" +
-		"    }," +
-		"    'time': {" +
-		"      'description': 'row time'," +
-		"      'type': 'string'," +
-		"      'format': 'date-time'" +
-		"    }" +
-		"  }," +
-		"  'required': ['name', 'count', 'time']" +
-		"}";
-
-	private static final String TOPIC = "test-topic";
-
-	protected abstract String version();
-
-	protected abstract KafkaJsonTableSource.Builder builder();
-
-	@Test
-	public void testTableSourceFromJsonSchema() {
-		testTableSource(
-			new Json()
-				.jsonSchema(JSON_SCHEMA)
-				.failOnMissingField(true)
-		);
-	}
-
-	@Test
-	public void testTableSourceDerivedSchema() {
-		testTableSource(
-			new Json()
-				.deriveSchema()
-				.failOnMissingField(true)
-		);
-	}
-
-	private void testTableSource(FormatDescriptor format) {
-		// construct table source using a builder
-
-		final Map<String, String> tableJsonMapping = new HashMap<>();
-		tableJsonMapping.put("fruit-name", "name");
-		tableJsonMapping.put("name", "name");
-		tableJsonMapping.put("count", "count");
-		tableJsonMapping.put("time", "time");
-
-		final Properties props = new Properties();
-		props.put("group.id", "test-group");
-		props.put("bootstrap.servers", "localhost:1234");
-
-		final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
-		specificOffsets.put(new KafkaTopicPartition(TOPIC, 0), 100L);
-		specificOffsets.put(new KafkaTopicPartition(TOPIC, 1), 123L);
-
-		final KafkaTableSourceBase builderSource = builder()
-				.forJsonSchema(TableSchema.fromTypeInfo(JsonRowSchemaConverter.convert(JSON_SCHEMA)))
-				.failOnMissingField(true)
-				.withTableToJsonMapping(tableJsonMapping)
-				.withKafkaProperties(props)
-				.forTopic(TOPIC)
-				.fromSpecificOffsets(specificOffsets)
-				.withSchema(
-					TableSchema.builder()
-						.field("fruit-name", Types.STRING)
-						.field("count", Types.BIG_DEC)
-						.field("event-time", Types.SQL_TIMESTAMP)
-						.field("proc-time", Types.SQL_TIMESTAMP)
-						.build())
-				.withProctimeAttribute("proc-time")
-				.withRowtimeAttribute("event-time", new ExistingField("time"), new AscendingTimestamps())
-				.build();
-
-		TableSourceUtil.validateTableSource(builderSource);
-
-		// construct table source using descriptors and table source factory
-
-		final Map<Integer, Long> offsets = new HashMap<>();
-		offsets.put(0, 100L);
-		offsets.put(1, 123L);
-
-		final TestTableDescriptor testDesc = new TestTableDescriptor(
-				new Kafka()
-					.version(version())
-					.topic(TOPIC)
-					.properties(props)
-					.startFromSpecificOffsets(offsets))
-			.withFormat(format)
-			.withSchema(
-				new Schema()
-						.field("fruit-name", Types.STRING).from("name")
-						.field("count", Types.BIG_DEC) // no from so it must match with the input
-						.field("event-time", Types.SQL_TIMESTAMP).rowtime(
-							new Rowtime().timestampsFromField("time").watermarksPeriodicAscending())
-						.field("proc-time", Types.SQL_TIMESTAMP).proctime())
-			.inAppendMode();
-
-		final Map<String, String> properties = testDesc.toProperties();
-		final TableSource<?> factorySource =
-				TableFactoryService.find(StreamTableSourceFactory.class, properties)
-						.createStreamTableSource(properties);
-
-		assertEquals(builderSource, factorySource);
-	}
-}
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceTestBase.java
deleted file mode 100644
index 4f95b962936..00000000000
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceTestBase.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.Types;
-import org.apache.flink.table.sources.tsextractors.ExistingField;
-import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
-
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-
-/**
- * Abstract test base for all Kafka JSON table sources.
- *
- * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
- *             drop support for format-specific table sources.
- */
-@Deprecated
-public abstract class KafkaJsonTableSourceTestBase extends KafkaTableSourceBuilderTestBase {
-
-	@Test
-	public void testJsonEqualsTableSchema() {
-		KafkaJsonTableSource.Builder b = (KafkaJsonTableSource.Builder) getBuilder();
-		this.configureBuilder(b);
-
-		KafkaJsonTableSource source = (KafkaJsonTableSource) b.build();
-
-		// check return type
-		RowTypeInfo returnType = (RowTypeInfo) source.getReturnType();
-		assertNotNull(returnType);
-		assertEquals(5, returnType.getArity());
-		// check field names
-		assertEquals("field1", returnType.getFieldNames()[0]);
-		assertEquals("field2", returnType.getFieldNames()[1]);
-		assertEquals("time1", returnType.getFieldNames()[2]);
-		assertEquals("time2", returnType.getFieldNames()[3]);
-		assertEquals("field3", returnType.getFieldNames()[4]);
-		// check field types
-		assertEquals(Types.LONG(), returnType.getTypeAt(0));
-		assertEquals(Types.STRING(), returnType.getTypeAt(1));
-		assertEquals(Types.SQL_TIMESTAMP(), returnType.getTypeAt(2));
-		assertEquals(Types.SQL_TIMESTAMP(), returnType.getTypeAt(3));
-		assertEquals(Types.DOUBLE(), returnType.getTypeAt(4));
-
-		// check field mapping
-		assertNull(source.getFieldMapping());
-	}
-
-	@Test
-	public void testCustomJsonSchemaWithMapping() {
-		KafkaJsonTableSource.Builder b = (KafkaJsonTableSource.Builder) getBuilder();
-		super.configureBuilder(b);
-		b.withProctimeAttribute("time2");
-
-		Map<String, String> mapping = new HashMap<>();
-		mapping.put("field1", "otherField1");
-		mapping.put("field2", "otherField2");
-		mapping.put("field3", "otherField3");
-
-		// set Avro class with different fields
-		b.forJsonSchema(TableSchema.builder()
-			.field("otherField1", Types.LONG())
-			.field("otherField2", Types.STRING())
-			.field("rowtime", Types.LONG())
-			.field("otherField3", Types.DOUBLE())
-			.field("otherField4", Types.BYTE())
-			.field("otherField5", Types.INT()).build());
-		b.withTableToJsonMapping(mapping);
-		b.withRowtimeAttribute("time1", new ExistingField("timeField1"), new AscendingTimestamps());
-
-		KafkaJsonTableSource source = (KafkaJsonTableSource) b.build();
-
-		// check return type
-		RowTypeInfo returnType = (RowTypeInfo) source.getReturnType();
-		assertNotNull(returnType);
-		assertEquals(6, returnType.getArity());
-		// check field names
-		assertEquals("otherField1", returnType.getFieldNames()[0]);
-		assertEquals("otherField2", returnType.getFieldNames()[1]);
-		assertEquals("rowtime", returnType.getFieldNames()[2]);
-		assertEquals("otherField3", returnType.getFieldNames()[3]);
-		assertEquals("otherField4", returnType.getFieldNames()[4]);
-		assertEquals("otherField5", returnType.getFieldNames()[5]);
-		// check field types
-		assertEquals(Types.LONG(), returnType.getTypeAt(0));
-		assertEquals(Types.STRING(), returnType.getTypeAt(1));
-		assertEquals(Types.LONG(), returnType.getTypeAt(2));
-		assertEquals(Types.DOUBLE(), returnType.getTypeAt(3));
-		assertEquals(Types.BYTE(), returnType.getTypeAt(4));
-		assertEquals(Types.INT(), returnType.getTypeAt(5));
-
-		// check field mapping
-		Map<String, String> fieldMapping = source.getFieldMapping();
-		assertNotNull(fieldMapping);
-		assertEquals(3, fieldMapping.size());
-		assertEquals("otherField1", fieldMapping.get("field1"));
-		assertEquals("otherField2", fieldMapping.get("field2"));
-		assertEquals("otherField3", fieldMapping.get("field3"));
-	}
-
-}
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBaseTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBaseTestBase.java
deleted file mode 100644
index dbbb10f51bf..00000000000
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBaseTestBase.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.table.api.Types;
-import org.apache.flink.types.Row;
-
-import org.junit.Test;
-
-import java.util.Optional;
-import java.util.Properties;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotSame;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Abstract test base for all Kafka table sink tests.
- *
- * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
- *             drop support for format-specific table sinks.
- */
-@Deprecated
-public abstract class KafkaTableSinkBaseTestBase {
-
-	private static final String TOPIC = "testTopic";
-	private static final String[] FIELD_NAMES = new String[] {"field1", "field2"};
-	private static final TypeInformation[] FIELD_TYPES = new TypeInformation[] { Types.INT(), Types.STRING() };
-	private static final FlinkKafkaPartitioner<Row> PARTITIONER = new CustomPartitioner();
-	private static final Properties PROPERTIES = createSinkProperties();
-
-	@SuppressWarnings("unchecked")
-	@Test
-	public void testKafkaTableSink() {
-		DataStream dataStream = mock(DataStream.class);
-		when(dataStream.addSink(any(SinkFunction.class))).thenReturn(mock(DataStreamSink.class));
-
-		KafkaTableSinkBase kafkaTableSink = spy(createTableSink());
-		kafkaTableSink.emitDataStream(dataStream);
-
-		// verify correct producer class
-		verify(dataStream).addSink(any(getProducerClass()));
-
-		// verify correctly configured producer
-		verify(kafkaTableSink).createKafkaProducer(
-			eq(TOPIC),
-			eq(PROPERTIES),
-			any(getSerializationSchemaClass()),
-			eq(Optional.of(PARTITIONER)));
-	}
-
-	@Test
-	public void testConfiguration() {
-		KafkaTableSinkBase kafkaTableSink = createTableSink();
-		KafkaTableSinkBase newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
-		assertNotSame(kafkaTableSink, newKafkaTableSink);
-
-		assertArrayEquals(FIELD_NAMES, newKafkaTableSink.getFieldNames());
-		assertArrayEquals(FIELD_TYPES, newKafkaTableSink.getFieldTypes());
-		assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType());
-	}
-
-	protected abstract KafkaTableSinkBase createTableSink(
-		String topic,
-		Properties properties,
-		FlinkKafkaPartitioner<Row> partitioner);
-
-	protected abstract Class<? extends SerializationSchema<Row>> getSerializationSchemaClass();
-
-	protected abstract Class<? extends FlinkKafkaProducerBase> getProducerClass();
-
-	private KafkaTableSinkBase createTableSink() {
-		KafkaTableSinkBase sink = createTableSink(TOPIC, PROPERTIES, PARTITIONER);
-		return sink.configure(FIELD_NAMES, FIELD_TYPES);
-	}
-
-	private static Properties createSinkProperties() {
-		Properties properties = new Properties();
-		properties.setProperty("bootstrap.servers", "localhost:12345");
-		return properties;
-	}
-
-	private static class CustomPartitioner extends FlinkKafkaPartitioner<Row> {
-		@Override
-		public int partition(Row record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
-			return 0;
-		}
-	}
-}
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceBuilderTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceBuilderTestBase.java
deleted file mode 100644
index f778ba6236b..00000000000
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceBuilderTestBase.java
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.Types;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.table.sources.tsextractors.ExistingField;
-import org.apache.flink.table.sources.tsextractors.StreamRecordTimestamp;
-import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
-import org.apache.flink.types.Row;
-
-import org.junit.Test;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Abstract test base for all format-specific Kafka table sources with builders.
- *
- * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
- *             drop support for format-specific table sources.
- */
-@Deprecated
-public abstract class KafkaTableSourceBuilderTestBase {
-
-	static final String[] FIELD_NAMES =
-		new String[]{"field1", "field2", "time1", "time2", "field3"};
-	static final TypeInformation[] FIELD_TYPES =
-		new TypeInformation[]{Types.LONG(), Types.STRING(), Types.SQL_TIMESTAMP(), Types.SQL_TIMESTAMP(), Types.DOUBLE()};
-
-	private static final String TOPIC = "testTopic";
-	private static final TableSchema SCHEMA = new TableSchema(FIELD_NAMES, FIELD_TYPES);
-	private static final Properties PROPS = createSourceProperties();
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testKafkaConsumer() {
-		KafkaTableSourceBase.Builder b = getBuilder();
-		configureBuilder(b);
-
-		// assert that correct
-		KafkaTableSourceBase observed = spy(b.build());
-		StreamExecutionEnvironment env = mock(StreamExecutionEnvironment.class);
-		when(env.addSource(any(SourceFunction.class))).thenReturn(mock(DataStreamSource.class));
-		observed.getDataStream(env);
-
-		verify(env).addSource(any(getFlinkKafkaConsumer()));
-
-		verify(observed).getKafkaConsumer(
-			eq(TOPIC),
-			eq(PROPS),
-			any(getDeserializationSchema()));
-	}
-
-	@Test
-	public void testTableSchema() {
-		KafkaTableSourceBase.Builder b = getBuilder();
-		configureBuilder(b);
-
-		KafkaTableSourceBase source = b.build();
-
-		// check table schema
-		TableSchema schema = source.getTableSchema();
-		assertNotNull(schema);
-		assertEquals(5, schema.getFieldNames().length);
-		// check table fields
-		assertEquals("field1", schema.getFieldNames()[0]);
-		assertEquals("field2", schema.getFieldNames()[1]);
-		assertEquals("time1", schema.getFieldNames()[2]);
-		assertEquals("time2", schema.getFieldNames()[3]);
-		assertEquals("field3", schema.getFieldNames()[4]);
-		assertEquals(Types.LONG(), schema.getFieldTypes()[0]);
-		assertEquals(Types.STRING(), schema.getFieldTypes()[1]);
-		assertEquals(Types.SQL_TIMESTAMP(), schema.getFieldTypes()[2]);
-		assertEquals(Types.SQL_TIMESTAMP(), schema.getFieldTypes()[3]);
-		assertEquals(Types.DOUBLE(), schema.getFieldTypes()[4]);
-	}
-
-	@Test
-	public void testNoTimeAttributes() {
-		KafkaTableSourceBase.Builder b = getBuilder();
-		configureBuilder(b);
-
-		KafkaTableSourceBase source = b.build();
-
-		// assert no proctime
-		assertNull(source.getProctimeAttribute());
-		// assert no rowtime
-		assertNotNull(source.getRowtimeAttributeDescriptors());
-		assertTrue(source.getRowtimeAttributeDescriptors().isEmpty());
-	}
-
-	@Test
-	public void testProctimeAttribute() {
-		KafkaTableSourceBase.Builder b = getBuilder();
-		configureBuilder(b);
-		b.withProctimeAttribute("time1");
-
-		KafkaTableSourceBase source = b.build();
-
-		// assert correct proctime field
-		assertEquals(source.getProctimeAttribute(), "time1");
-
-		// assert no rowtime
-		assertNotNull(source.getRowtimeAttributeDescriptors());
-		assertTrue(source.getRowtimeAttributeDescriptors().isEmpty());
-	}
-
-	@Test
-	public void testRowtimeAttribute() {
-		KafkaTableSourceBase.Builder b = getBuilder();
-		configureBuilder(b);
-		b.withRowtimeAttribute("time2", new ExistingField("time2"), new AscendingTimestamps());
-
-		KafkaTableSourceBase source = b.build();
-
-		// assert no proctime
-		assertNull(source.getProctimeAttribute());
-
-		// assert correct rowtime descriptor
-		List<RowtimeAttributeDescriptor> descs = source.getRowtimeAttributeDescriptors();
-		assertNotNull(descs);
-		assertEquals(1, descs.size());
-		RowtimeAttributeDescriptor desc = descs.get(0);
-		assertEquals("time2", desc.getAttributeName());
-		// assert timestamp extractor
-		assertTrue(desc.getTimestampExtractor() instanceof ExistingField);
-		assertEquals(1, desc.getTimestampExtractor().getArgumentFields().length);
-		assertEquals("time2", desc.getTimestampExtractor().getArgumentFields()[0]);
-		// assert watermark strategy
-		assertTrue(desc.getWatermarkStrategy() instanceof AscendingTimestamps);
-	}
-
-	@Test
-	public void testRowtimeAttribute2() {
-		KafkaTableSourceBase.Builder b = getBuilder();
-		configureBuilder(b);
-
-		try {
-			b.withKafkaTimestampAsRowtimeAttribute("time2", new AscendingTimestamps());
-
-			KafkaTableSourceBase source = b.build();
-
-			// assert no proctime
-			assertNull(source.getProctimeAttribute());
-
-			// assert correct rowtime descriptor
-			List<RowtimeAttributeDescriptor> descs = source.getRowtimeAttributeDescriptors();
-			assertNotNull(descs);
-			assertEquals(1, descs.size());
-			RowtimeAttributeDescriptor desc = descs.get(0);
-			assertEquals("time2", desc.getAttributeName());
-			// assert timestamp extractor
-			assertTrue(desc.getTimestampExtractor() instanceof StreamRecordTimestamp);
-			assertTrue(desc.getTimestampExtractor().getArgumentFields().length == 0);
-			// assert watermark strategy
-			assertTrue(desc.getWatermarkStrategy() instanceof AscendingTimestamps);
-		} catch (Exception e) {
-			if (b.supportsKafkaTimestamps()) {
-				// builder should support Kafka timestamps
-				fail();
-			}
-		}
-	}
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testConsumerOffsets() {
-		KafkaTableSourceBase.Builder b = getBuilder();
-		configureBuilder(b);
-
-		// test the default behavior
-		KafkaTableSourceBase source = spy(b.build());
-		when(source.createKafkaConsumer(TOPIC, PROPS, null))
-				.thenReturn(mock(getFlinkKafkaConsumer()));
-
-		verify(source.getKafkaConsumer(TOPIC, PROPS, null)).setStartFromGroupOffsets();
-
-		// test reading from earliest
-		b.fromEarliest();
-		source = spy(b.build());
-		when(source.createKafkaConsumer(TOPIC, PROPS, null))
-				.thenReturn(mock(getFlinkKafkaConsumer()));
-
-		verify(source.getKafkaConsumer(TOPIC, PROPS, null)).setStartFromEarliest();
-
-		// test reading from latest
-		b.fromLatest();
-		source = spy(b.build());
-		when(source.createKafkaConsumer(TOPIC, PROPS, null))
-				.thenReturn(mock(getFlinkKafkaConsumer()));
-		verify(source.getKafkaConsumer(TOPIC, PROPS, null)).setStartFromLatest();
-
-		// test reading from group offsets
-		b.fromGroupOffsets();
-		source = spy(b.build());
-		when(source.createKafkaConsumer(TOPIC, PROPS, null))
-				.thenReturn(mock(getFlinkKafkaConsumer()));
-		verify(source.getKafkaConsumer(TOPIC, PROPS, null)).setStartFromGroupOffsets();
-
-		// test reading from given offsets
-		b.fromSpecificOffsets(mock(Map.class));
-		source = spy(b.build());
-		when(source.createKafkaConsumer(TOPIC, PROPS, null))
-				.thenReturn(mock(getFlinkKafkaConsumer()));
-		verify(source.getKafkaConsumer(TOPIC, PROPS, null)).setStartFromSpecificOffsets(any(Map.class));
-	}
-
-	protected abstract KafkaTableSourceBase.Builder getBuilder();
-
-	protected abstract Class<DeserializationSchema<Row>> getDeserializationSchema();
-
-	protected abstract Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer();
-
-	protected void configureBuilder(KafkaTableSourceBase.Builder builder) {
-		builder
-			.forTopic(TOPIC)
-			.withKafkaProperties(PROPS)
-			.withSchema(SCHEMA);
-	}
-
-	private static Properties createSourceProperties() {
-		Properties properties = new Properties();
-		properties.setProperty("zookeeper.connect", "dummy");
-		properties.setProperty("group.id", "dummy");
-		return properties;
-	}
-
-}
diff --git a/flink-connectors/flink-connector-kafka/pom.xml b/flink-connectors/flink-connector-kafka/pom.xml
index 63d43994687..53760089cb1 100644
--- a/flink-connectors/flink-connector-kafka/pom.xml
+++ b/flink-connectors/flink-connector-kafka/pom.xml
@@ -85,22 +85,6 @@ under the License.
 			<optional>true</optional>
 		</dependency>
 
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-avro</artifactId>
-			<version>${project.version}</version>
-			<!-- Projects depending on this project, won't depend on flink-avro. -->
-			<optional>true</optional>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-json</artifactId>
-			<version>${project.version}</version>
-			<!-- Projects depending on this project, won't depend on flink-json. -->
-			<optional>true</optional>
-		</dependency>
-
 		<!-- test dependencies -->
 
 		<dependency>
@@ -111,23 +95,6 @@ under the License.
 			<type>test-jar</type>
 		</dependency>
 
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-avro</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-			<type>test-jar</type>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-json</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
-
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services