You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/07/23 16:18:32 UTC

[1/2] flink git commit: [FLINK-9846] [table] Add a Kafka table sink factory

Repository: flink
Updated Branches:
  refs/heads/master 9e348d32c -> 57b3cde86


http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactoryTest.java
deleted file mode 100644
index b976e14..0000000
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactoryTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.KafkaValidator;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.types.Row;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-/**
- * Test for {@link Kafka09TableSource} created by {@link Kafka09TableSourceFactory}.
- */
-public class Kafka09TableSourceFactoryTest extends KafkaTableSourceFactoryTestBase {
-
-	@Override
-	protected String getKafkaVersion() {
-		return KafkaValidator.CONNECTOR_VERSION_VALUE_09;
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	protected Class<FlinkKafkaConsumerBase<Row>> getExpectedFlinkKafkaConsumer() {
-		return (Class) FlinkKafkaConsumer09.class;
-	}
-
-	@Override
-	protected KafkaTableSource getExpectedKafkaTableSource(
-			TableSchema schema,
-			Optional<String> proctimeAttribute,
-			List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
-			Map<String, String> fieldMapping,
-			String topic,
-			Properties properties,
-			DeserializationSchema<Row> deserializationSchema,
-			StartupMode startupMode,
-			Map<KafkaTopicPartition, Long> specificStartupOffsets) {
-
-		return new Kafka09TableSource(
-			schema,
-			proctimeAttribute,
-			rowtimeAttributeDescriptors,
-			Optional.of(fieldMapping),
-			topic,
-			properties,
-			deserializationSchema,
-			startupMode,
-			specificStartupOffsets
-		);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java
new file mode 100644
index 0000000..a6c8bd4
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Test for {@link Kafka09TableSource} and {@link Kafka09TableSink} created
+ * by {@link Kafka09TableSourceSinkFactory}.
+ */
+public class Kafka09TableSourceSinkFactoryTest extends KafkaTableSourceSinkFactoryTestBase {
+
+	@Override
+	protected String getKafkaVersion() {
+		return KafkaValidator.CONNECTOR_VERSION_VALUE_09;
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	protected Class<FlinkKafkaConsumerBase<Row>> getExpectedFlinkKafkaConsumer() {
+		return (Class) FlinkKafkaConsumer09.class;
+	}
+
+	@Override
+	protected Class<?> getExpectedFlinkKafkaProducer() {
+		return FlinkKafkaProducer09.class;
+	}
+
+	@Override
+	protected KafkaTableSource getExpectedKafkaTableSource(
+			TableSchema schema,
+			Optional<String> proctimeAttribute,
+			List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+			Map<String, String> fieldMapping,
+			String topic,
+			Properties properties,
+			DeserializationSchema<Row> deserializationSchema,
+			StartupMode startupMode,
+			Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+		return new Kafka09TableSource(
+			schema,
+			proctimeAttribute,
+			rowtimeAttributeDescriptors,
+			Optional.of(fieldMapping),
+			topic,
+			properties,
+			deserializationSchema,
+			startupMode,
+			specificStartupOffsets
+		);
+	}
+
+	@Override
+	protected KafkaTableSink getExpectedKafkaTableSink(
+			TableSchema schema,
+			String topic,
+			Properties properties,
+			FlinkKafkaPartitioner<Row> partitioner,
+			SerializationSchema<Row> serializationSchema) {
+
+		return new Kafka09TableSink(
+			schema,
+			topic,
+			properties,
+			partitioner,
+			serializationSchema
+		);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
index ec27398..231eddd 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
@@ -29,7 +29,10 @@ import java.util.Properties;
 
 /**
  * Base class for {@link KafkaTableSink} that serializes data in JSON format.
+ *
+ * @deprecated Use table descriptors instead of implementation-specific classes.
  */
+@Deprecated
 @Internal
 public abstract class KafkaJsonTableSink extends KafkaTableSink {
 
@@ -39,7 +42,9 @@ public abstract class KafkaJsonTableSink extends KafkaTableSink {
 	 * @param topic topic in Kafka to which table is written
 	 * @param properties properties to connect to Kafka
 	 * @param partitioner Kafka partitioner
+	 * @deprecated Use table descriptors instead of implementation-specific classes.
 	 */
+	@Deprecated
 	public KafkaJsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
 		super(topic, properties, partitioner);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
index 687df58..7853bb7 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -23,12 +23,17 @@ import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.sinks.AppendStreamTableSink;
 import org.apache.flink.table.util.TableConnectorUtil;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Optional;
 import java.util.Properties;
 
 /**
@@ -40,27 +45,59 @@ import java.util.Properties;
 @Internal
 public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
 
+	// TODO make all attributes final and mandatory once we drop support for format-specific table sinks
+
+	/** The schema of the table. */
+	private final Optional<TableSchema> schema;
+
+	/** The Kafka topic to write to. */
 	protected final String topic;
+
+	/** Properties for the Kafka producer. */
 	protected final Properties properties;
-	protected SerializationSchema<Row> serializationSchema;
+
+	/** Serialization schema for encoding records to Kafka. */
+	protected Optional<SerializationSchema<Row>> serializationSchema;
+
+	/** Partitioner to select Kafka partition for each item. */
 	protected final FlinkKafkaPartitioner<Row> partitioner;
+
+	// legacy variables
 	protected String[] fieldNames;
 	protected TypeInformation[] fieldTypes;
 
+	protected KafkaTableSink(
+			TableSchema schema,
+			String topic,
+			Properties properties,
+			FlinkKafkaPartitioner<Row> partitioner,
+			SerializationSchema<Row> serializationSchema) {
+		this.schema = Optional.of(Preconditions.checkNotNull(schema, "Schema must not be null."));
+		this.topic = Preconditions.checkNotNull(topic, "Topic must not be null.");
+		this.properties = Preconditions.checkNotNull(properties, "Properties must not be null.");
+		this.partitioner = Preconditions.checkNotNull(partitioner, "Partitioner must not be null.");
+		this.serializationSchema = Optional.of(Preconditions.checkNotNull(
+			serializationSchema, "Serialization schema must not be null."));
+	}
+
 	/**
 	 * Creates KafkaTableSink.
 	 *
 	 * @param topic                 Kafka topic to write to.
-	 * @param properties            Properties for the Kafka consumer.
+	 * @param properties            Properties for the Kafka producer.
 	 * @param partitioner           Partitioner to select Kafka partition for each item
+	 * @deprecated Use table descriptors instead of implementation-specific classes.
 	 */
+	@Deprecated
 	public KafkaTableSink(
 			String topic,
 			Properties properties,
 			FlinkKafkaPartitioner<Row> partitioner) {
+		this.schema = Optional.empty();
 		this.topic = Preconditions.checkNotNull(topic, "topic");
 		this.properties = Preconditions.checkNotNull(properties, "properties");
 		this.partitioner = Preconditions.checkNotNull(partitioner, "partitioner");
+		this.serializationSchema = Optional.empty();
 	}
 
 	/**
@@ -72,8 +109,9 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
 	 * @param partitioner         Partitioner to select Kafka partition.
 	 * @return The version-specific Kafka producer
 	 */
-	protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer(
-		String topic, Properties properties,
+	protected abstract SinkFunction<Row> createKafkaProducer(
+		String topic,
+		Properties properties,
 		SerializationSchema<Row> serializationSchema,
 		FlinkKafkaPartitioner<Row> partitioner);
 
@@ -82,40 +120,57 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
 	 *
 	 * @param rowSchema the schema of the row to serialize.
 	 * @return Instance of serialization schema
+	 * @deprecated Use the constructor to pass a serialization schema instead.
 	 */
-	protected abstract SerializationSchema<Row> createSerializationSchema(RowTypeInfo rowSchema);
+	@Deprecated
+	protected SerializationSchema<Row> createSerializationSchema(RowTypeInfo rowSchema) {
+		throw new UnsupportedOperationException("This method only exists for backwards compatibility.");
+	}
 
 	/**
 	 * Create a deep copy of this sink.
 	 *
 	 * @return Deep copy of this sink
 	 */
-	protected abstract KafkaTableSink createCopy();
+	@Deprecated
+	protected KafkaTableSink createCopy() {
+		throw new UnsupportedOperationException("This method only exists for backwards compatibility.");
+	}
 
 	@Override
 	public void emitDataStream(DataStream<Row> dataStream) {
-		FlinkKafkaProducerBase<Row> kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner);
-		// always enable flush on checkpoint to achieve at-least-once if query runs with checkpointing enabled.
-		kafkaProducer.setFlushOnCheckpoint(true);
+		SinkFunction<Row> kafkaProducer = createKafkaProducer(
+			topic,
+			properties,
+			serializationSchema.orElseThrow(() -> new IllegalStateException("No serialization schema defined.")),
+			partitioner);
 		dataStream.addSink(kafkaProducer).name(TableConnectorUtil.generateRuntimeName(this.getClass(), fieldNames));
 	}
 
 	@Override
 	public TypeInformation<Row> getOutputType() {
-		return new RowTypeInfo(getFieldTypes());
+		return schema
+			.map(TableSchema::toRowType)
+			.orElseGet(() -> new RowTypeInfo(getFieldTypes()));
 	}
 
 	public String[] getFieldNames() {
-		return fieldNames;
+		return schema.map(TableSchema::getColumnNames).orElse(fieldNames);
 	}
 
 	@Override
 	public TypeInformation<?>[] getFieldTypes() {
-		return fieldTypes;
+		return schema.map(TableSchema::getTypes).orElse(fieldTypes);
 	}
 
 	@Override
 	public KafkaTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+		if (schema.isPresent()) {
+			// a fixed schema is defined so reconfiguration is not supported
+			throw new UnsupportedOperationException("Reconfiguration of this sink is not supported.");
+		}
+
+		// legacy code
 		KafkaTableSink copy = createCopy();
 		copy.fieldNames = Preconditions.checkNotNull(fieldNames, "fieldNames");
 		copy.fieldTypes = Preconditions.checkNotNull(fieldTypes, "fieldTypes");
@@ -123,8 +178,39 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
 			"Number of provided field names and types does not match.");
 
 		RowTypeInfo rowSchema = new RowTypeInfo(fieldTypes, fieldNames);
-		copy.serializationSchema = createSerializationSchema(rowSchema);
+		copy.serializationSchema = Optional.of(createSerializationSchema(rowSchema));
 
 		return copy;
 	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		KafkaTableSink that = (KafkaTableSink) o;
+		return Objects.equals(schema, that.schema) &&
+			Objects.equals(topic, that.topic) &&
+			Objects.equals(properties, that.properties) &&
+			Objects.equals(serializationSchema, that.serializationSchema) &&
+			Objects.equals(partitioner, that.partitioner) &&
+			Arrays.equals(fieldNames, that.fieldNames) &&
+			Arrays.equals(fieldTypes, that.fieldTypes);
+	}
+
+	@Override
+	public int hashCode() {
+		int result = Objects.hash(
+			schema,
+			topic,
+			properties,
+			serializationSchema,
+			partitioner);
+		result = 31 * result + Arrays.hashCode(fieldNames);
+		result = 31 * result + Arrays.hashCode(fieldTypes);
+		return result;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java
deleted file mode 100644
index d7e42f5..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.descriptors.KafkaValidator;
-import org.apache.flink.table.descriptors.SchemaValidator;
-import org.apache.flink.table.factories.DeserializationSchemaFactory;
-import org.apache.flink.table.factories.StreamTableSourceFactory;
-import org.apache.flink.table.factories.TableFactoryService;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.table.sources.StreamTableSource;
-import org.apache.flink.types.Row;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
-import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
-import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
-import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT;
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES;
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_KEY;
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_VALUE;
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS;
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_OFFSET;
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_PARTITION;
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_STARTUP_MODE;
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TOPIC;
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TYPE_VALUE_KAFKA;
-import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_CLASS;
-import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_FROM;
-import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_SERIALIZED;
-import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_TYPE;
-import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_CLASS;
-import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_DELAY;
-import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_SERIALIZED;
-import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_TYPE;
-import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA;
-import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_FROM;
-import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_NAME;
-import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_PROCTIME;
-import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_TYPE;
-import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE;
-import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND;
-
-/**
- * Factory for creating configured instances of {@link KafkaTableSource}.
- */
-public abstract class KafkaTableSourceFactory implements StreamTableSourceFactory<Row> {
-
-	@Override
-	public Map<String, String> requiredContext() {
-		Map<String, String> context = new HashMap<>();
-		context.put(UPDATE_MODE(), UPDATE_MODE_VALUE_APPEND()); // append mode
-		context.put(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE_KAFKA); // kafka
-		context.put(CONNECTOR_VERSION(), kafkaVersion()); // version
-		context.put(CONNECTOR_PROPERTY_VERSION(), "1"); // backwards compatibility
-		return context;
-	}
-
-	@Override
-	public List<String> supportedProperties() {
-		List<String> properties = new ArrayList<>();
-
-		// kafka
-		properties.add(CONNECTOR_TOPIC);
-		properties.add(CONNECTOR_PROPERTIES);
-		properties.add(CONNECTOR_PROPERTIES + ".#." + CONNECTOR_PROPERTIES_KEY);
-		properties.add(CONNECTOR_PROPERTIES + ".#." + CONNECTOR_PROPERTIES_VALUE);
-		properties.add(CONNECTOR_STARTUP_MODE);
-		properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + CONNECTOR_SPECIFIC_OFFSETS_PARTITION);
-		properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + CONNECTOR_SPECIFIC_OFFSETS_OFFSET);
-
-		// schema
-		properties.add(SCHEMA() + ".#." + SCHEMA_TYPE());
-		properties.add(SCHEMA() + ".#." + SCHEMA_NAME());
-		properties.add(SCHEMA() + ".#." + SCHEMA_FROM());
-
-		// time attributes
-		properties.add(SCHEMA() + ".#." + SCHEMA_PROCTIME());
-		properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_TYPE());
-		properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_FROM());
-		properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_CLASS());
-		properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED());
-		properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_TYPE());
-		properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_CLASS());
-		properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_SERIALIZED());
-		properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_DELAY());
-
-		// format wildcard
-		properties.add(FORMAT() + ".*");
-
-		return properties;
-	}
-
-	@Override
-	public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
-		final DescriptorProperties params = new DescriptorProperties(true);
-		params.putProperties(properties);
-
-		// validate
-		// allow Kafka timestamps to be used, watermarks can not be received from source
-		new SchemaValidator(true, supportsKafkaTimestamps(), false).validate(params);
-		new KafkaValidator().validate(params);
-
-		// deserialization schema using format discovery
-		final DeserializationSchemaFactory<?> formatFactory = TableFactoryService.find(
-			DeserializationSchemaFactory.class,
-			properties,
-			this.getClass().getClassLoader());
-		@SuppressWarnings("unchecked")
-		final DeserializationSchema<Row> deserializationSchema = (DeserializationSchema<Row>) formatFactory
-			.createDeserializationSchema(properties);
-
-		// schema
-		final TableSchema schema = params.getTableSchema(SCHEMA());
-
-		// proctime
-		final Optional<String> proctimeAttribute = SchemaValidator.deriveProctimeAttribute(params);
-
-		// rowtime
-		final List<RowtimeAttributeDescriptor> rowtimeAttributes = SchemaValidator.deriveRowtimeAttributes(params);
-
-		// field mapping
-		final Map<String, String> fieldMapping = SchemaValidator.deriveFieldMapping(params, Optional.of(schema));
-
-		// properties
-		final Properties kafkaProperties = new Properties();
-		final List<Map<String, String>> propsList = params.getFixedIndexedProperties(
-			CONNECTOR_PROPERTIES,
-			Arrays.asList(CONNECTOR_PROPERTIES_KEY, CONNECTOR_PROPERTIES_VALUE));
-		propsList.forEach(kv -> kafkaProperties.put(
-			params.getString(kv.get(CONNECTOR_PROPERTIES_KEY)),
-			params.getString(kv.get(CONNECTOR_PROPERTIES_VALUE))
-		));
-
-		// topic
-		final String topic = params.getString(CONNECTOR_TOPIC);
-
-		// startup mode
-		final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
-		final StartupMode startupMode = params
-			.getOptionalString(CONNECTOR_STARTUP_MODE)
-			.map(modeString -> {
-				switch (modeString) {
-					case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_EARLIEST:
-						return StartupMode.EARLIEST;
-
-					case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_LATEST:
-						return StartupMode.LATEST;
-
-					case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_GROUP_OFFSETS:
-						return StartupMode.GROUP_OFFSETS;
-
-					case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS:
-						final List<Map<String, String>> offsetList = params.getFixedIndexedProperties(
-							CONNECTOR_SPECIFIC_OFFSETS,
-							Arrays.asList(CONNECTOR_SPECIFIC_OFFSETS_PARTITION, CONNECTOR_SPECIFIC_OFFSETS_OFFSET));
-						offsetList.forEach(kv -> {
-							final int partition = params.getInt(kv.get(CONNECTOR_SPECIFIC_OFFSETS_PARTITION));
-							final long offset = params.getLong(kv.get(CONNECTOR_SPECIFIC_OFFSETS_OFFSET));
-							final KafkaTopicPartition topicPartition = new KafkaTopicPartition(topic, partition);
-							specificOffsets.put(topicPartition, offset);
-						});
-						return StartupMode.SPECIFIC_OFFSETS;
-					default:
-						throw new TableException("Unsupported startup mode. Validator should have checked that.");
-				}
-			}).orElse(StartupMode.GROUP_OFFSETS);
-
-		return createKafkaTableSource(
-			schema,
-			proctimeAttribute,
-			rowtimeAttributes,
-			fieldMapping,
-			topic,
-			kafkaProperties,
-			deserializationSchema,
-			startupMode,
-			specificOffsets);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// For version-specific factories
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Returns the Kafka version.
-	 */
-	protected abstract String kafkaVersion();
-
-	/**
-	 * True if the Kafka source supports Kafka timestamps, false otherwise.
-	 *
-	 * @return True if the Kafka source supports Kafka timestamps, false otherwise.
-	 */
-	protected abstract boolean supportsKafkaTimestamps();
-
-	/**
-	 * Constructs the version-specific Kafka table source.
-	 *
-	 * @param schema                      Schema of the produced table.
-	 * @param proctimeAttribute           Field name of the processing time attribute.
-	 * @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute
-	 * @param fieldMapping                Mapping for the fields of the table schema to
-	 *                                    fields of the physical returned type.
-	 * @param topic                       Kafka topic to consume.
-	 * @param properties                  Properties for the Kafka consumer.
-	 * @param deserializationSchema       Deserialization schema for decoding records from Kafka.
-	 * @param startupMode                 Startup mode for the contained consumer.
-	 * @param specificStartupOffsets      Specific startup offsets; only relevant when startup
-	 *                                    mode is {@link StartupMode#SPECIFIC_OFFSETS}.
-	 */
-	protected abstract KafkaTableSource createKafkaTableSource(
-		TableSchema schema,
-		Optional<String> proctimeAttribute,
-		List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
-		Map<String, String> fieldMapping,
-		String topic, Properties properties,
-		DeserializationSchema<Row> deserializationSchema,
-		StartupMode startupMode,
-		Map<KafkaTopicPartition, Long> specificStartupOffsets);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
new file mode 100644
index 0000000..3307994
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.factories.DeserializationSchemaFactory;
+import org.apache.flink.table.factories.SerializationSchemaFactory;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.factories.TableFactoryService;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
+import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_KEY;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_VALUE;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_OFFSET;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_PARTITION;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_STARTUP_MODE;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TOPIC;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TYPE_VALUE_KAFKA;
+import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_CLASS;
+import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_FROM;
+import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_SERIALIZED;
+import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_TYPE;
+import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_CLASS;
+import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_DELAY;
+import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_SERIALIZED;
+import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_TYPE;
+import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA;
+import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_FROM;
+import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_NAME;
+import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_PROCTIME;
+import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_TYPE;
+import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE;
+import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND;
+
+/**
+ * Factory for creating configured instances of {@link KafkaTableSource}.
+ */
+public abstract class KafkaTableSourceSinkFactoryBase implements
+		StreamTableSourceFactory<Row>,
+		StreamTableSinkFactory<Row> {
+
+	@Override
+	public Map<String, String> requiredContext() {
+		Map<String, String> context = new HashMap<>();
+		context.put(UPDATE_MODE(), UPDATE_MODE_VALUE_APPEND()); // append mode
+		context.put(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE_KAFKA); // kafka
+		context.put(CONNECTOR_VERSION(), kafkaVersion()); // version
+		context.put(CONNECTOR_PROPERTY_VERSION(), "1"); // backwards compatibility
+		return context;
+	}
+
+	@Override
+	public List<String> supportedProperties() {
+		List<String> properties = new ArrayList<>();
+
+		// kafka
+		properties.add(CONNECTOR_TOPIC);
+		properties.add(CONNECTOR_PROPERTIES);
+		properties.add(CONNECTOR_PROPERTIES + ".#." + CONNECTOR_PROPERTIES_KEY);
+		properties.add(CONNECTOR_PROPERTIES + ".#." + CONNECTOR_PROPERTIES_VALUE);
+		properties.add(CONNECTOR_STARTUP_MODE);
+		properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + CONNECTOR_SPECIFIC_OFFSETS_PARTITION);
+		properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + CONNECTOR_SPECIFIC_OFFSETS_OFFSET);
+
+		// schema
+		properties.add(SCHEMA() + ".#." + SCHEMA_TYPE());
+		properties.add(SCHEMA() + ".#." + SCHEMA_NAME());
+		properties.add(SCHEMA() + ".#." + SCHEMA_FROM());
+
+		// time attributes
+		properties.add(SCHEMA() + ".#." + SCHEMA_PROCTIME());
+		properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_TYPE());
+		properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_FROM());
+		properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_CLASS());
+		properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED());
+		properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_TYPE());
+		properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_CLASS());
+		properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_SERIALIZED());
+		properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_DELAY());
+
+		// format wildcard
+		properties.add(FORMAT() + ".*");
+
+		return properties;
+	}
+
+	@Override
+	public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
+		final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
+
+		final TableSchema schema = descriptorProperties.getTableSchema(SCHEMA());
+		final String topic = descriptorProperties.getString(CONNECTOR_TOPIC);
+		final StartupOptions startupOptions = getStartupOptions(descriptorProperties, topic);
+
+		return createKafkaTableSource(
+			schema,
+			SchemaValidator.deriveProctimeAttribute(descriptorProperties),
+			SchemaValidator.deriveRowtimeAttributes(descriptorProperties),
+			SchemaValidator.deriveFieldMapping(descriptorProperties, Optional.of(schema)),
+			topic,
+			getKafkaProperties(descriptorProperties),
+			getDeserializationSchema(properties),
+			startupOptions.startupMode,
+			startupOptions.specificOffsets);
+	}
+
+	@Override
+	public StreamTableSink<Row> createStreamTableSink(Map<String, String> properties) {
+		final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
+
+		final TableSchema schema = descriptorProperties.getTableSchema(SCHEMA());
+		final String topic = descriptorProperties.getString(CONNECTOR_TOPIC);
+		final Optional<String> proctime = SchemaValidator.deriveProctimeAttribute(descriptorProperties);
+		final List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors =
+			SchemaValidator.deriveRowtimeAttributes(descriptorProperties);
+
+		// see also FLINK-9870
+		if (proctime.isPresent() || !rowtimeAttributeDescriptors.isEmpty() ||
+				checkForCustomFieldMapping(descriptorProperties, schema)) {
+			throw new TableException("Time attributes and custom field mappings are not supported yet.");
+		}
+
+		return createKafkaTableSink(
+			schema,
+			topic,
+			getKafkaProperties(descriptorProperties),
+			getFlinkKafkaPartitioner(),
+			getSerializationSchema(properties));
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// For version-specific factories
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Returns the Kafka version.
+	 */
+	protected abstract String kafkaVersion();
+
+	/**
+	 * True if the Kafka source supports Kafka timestamps, false otherwise.
+	 *
+	 * @return True if the Kafka source supports Kafka timestamps, false otherwise.
+	 */
+	protected abstract boolean supportsKafkaTimestamps();
+
+	/**
+	 * Constructs the version-specific Kafka table source.
+	 *
+	 * @param schema                      Schema of the produced table.
+	 * @param proctimeAttribute           Field name of the processing time attribute.
+	 * @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute
+	 * @param fieldMapping                Mapping for the fields of the table schema to
+	 *                                    fields of the physical returned type.
+	 * @param topic                       Kafka topic to consume.
+	 * @param properties                  Properties for the Kafka consumer.
+	 * @param deserializationSchema       Deserialization schema for decoding records from Kafka.
+	 * @param startupMode                 Startup mode for the contained consumer.
+	 * @param specificStartupOffsets      Specific startup offsets; only relevant when startup
+	 *                                    mode is {@link StartupMode#SPECIFIC_OFFSETS}.
+	 */
+	protected abstract KafkaTableSource createKafkaTableSource(
+		TableSchema schema,
+		Optional<String> proctimeAttribute,
+		List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+		Map<String, String> fieldMapping,
+		String topic,
+		Properties properties,
+		DeserializationSchema<Row> deserializationSchema,
+		StartupMode startupMode,
+		Map<KafkaTopicPartition, Long> specificStartupOffsets);
+
+	/**
+	 * Constructs the version-specific Kafka table sink.
+	 *
+	 * @param schema      Schema of the produced table.
+	 * @param topic       Kafka topic to consume.
+	 * @param properties  Properties for the Kafka consumer.
+	 * @param partitioner Partitioner to select Kafka partition for each item.
+	 */
+	protected abstract KafkaTableSink createKafkaTableSink(
+		TableSchema schema,
+		String topic,
+		Properties properties,
+		FlinkKafkaPartitioner<Row> partitioner,
+		SerializationSchema<Row> serializationSchema);
+
+	// --------------------------------------------------------------------------------------------
+	// Helper methods
+	// --------------------------------------------------------------------------------------------
+
+	private DescriptorProperties getValidatedProperties(Map<String, String> properties) {
+		final DescriptorProperties descriptorProperties = new DescriptorProperties(true);
+		descriptorProperties.putProperties(properties);
+
+		// allow Kafka timestamps to be used, watermarks can not be received from source
+		new SchemaValidator(true, supportsKafkaTimestamps(), false).validate(descriptorProperties);
+		new KafkaValidator().validate(descriptorProperties);
+
+		return descriptorProperties;
+	}
+
+	private DeserializationSchema<Row> getDeserializationSchema(Map<String, String> properties) {
+		@SuppressWarnings("unchecked")
+		final DeserializationSchemaFactory<Row> formatFactory = TableFactoryService.find(
+			DeserializationSchemaFactory.class,
+			properties,
+			this.getClass().getClassLoader());
+		return formatFactory.createDeserializationSchema(properties);
+	}
+
+	private SerializationSchema<Row> getSerializationSchema(Map<String, String> properties) {
+		@SuppressWarnings("unchecked")
+		final SerializationSchemaFactory<Row> formatFactory = TableFactoryService.find(
+			SerializationSchemaFactory.class,
+			properties,
+			this.getClass().getClassLoader());
+		return formatFactory.createSerializationSchema(properties);
+	}
+
+	private Properties getKafkaProperties(DescriptorProperties descriptorProperties) {
+		final Properties kafkaProperties = new Properties();
+		final List<Map<String, String>> propsList = descriptorProperties.getFixedIndexedProperties(
+			CONNECTOR_PROPERTIES,
+			Arrays.asList(CONNECTOR_PROPERTIES_KEY, CONNECTOR_PROPERTIES_VALUE));
+		propsList.forEach(kv -> kafkaProperties.put(
+			descriptorProperties.getString(kv.get(CONNECTOR_PROPERTIES_KEY)),
+			descriptorProperties.getString(kv.get(CONNECTOR_PROPERTIES_VALUE))
+		));
+		return kafkaProperties;
+	}
+
+	private StartupOptions getStartupOptions(
+			DescriptorProperties descriptorProperties,
+			String topic) {
+		final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
+		final StartupMode startupMode = descriptorProperties
+			.getOptionalString(CONNECTOR_STARTUP_MODE)
+			.map(modeString -> {
+				switch (modeString) {
+					case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_EARLIEST:
+						return StartupMode.EARLIEST;
+
+					case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_LATEST:
+						return StartupMode.LATEST;
+
+					case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_GROUP_OFFSETS:
+						return StartupMode.GROUP_OFFSETS;
+
+					case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS:
+						final List<Map<String, String>> offsetList = descriptorProperties.getFixedIndexedProperties(
+							CONNECTOR_SPECIFIC_OFFSETS,
+							Arrays.asList(CONNECTOR_SPECIFIC_OFFSETS_PARTITION, CONNECTOR_SPECIFIC_OFFSETS_OFFSET));
+						offsetList.forEach(kv -> {
+							final int partition = descriptorProperties.getInt(kv.get(CONNECTOR_SPECIFIC_OFFSETS_PARTITION));
+							final long offset = descriptorProperties.getLong(kv.get(CONNECTOR_SPECIFIC_OFFSETS_OFFSET));
+							final KafkaTopicPartition topicPartition = new KafkaTopicPartition(topic, partition);
+							specificOffsets.put(topicPartition, offset);
+						});
+						return StartupMode.SPECIFIC_OFFSETS;
+					default:
+						throw new TableException("Unsupported startup mode. Validator should have checked that.");
+				}
+			}).orElse(StartupMode.GROUP_OFFSETS);
+		final StartupOptions options = new StartupOptions();
+		options.startupMode = startupMode;
+		options.specificOffsets = specificOffsets;
+		return options;
+	}
+
+	private FlinkKafkaPartitioner<Row> getFlinkKafkaPartitioner() {
+		// we don't support custom partitioner so far
+		return new FlinkFixedPartitioner<>();
+	}
+
+	private boolean checkForCustomFieldMapping(DescriptorProperties descriptorProperties, TableSchema schema) {
+		final Map<String, String> fieldMapping = SchemaValidator.deriveFieldMapping(descriptorProperties, Optional.of(schema));
+		return fieldMapping.size() != schema.getColumnNames().length ||
+			!fieldMapping.entrySet().stream().allMatch(mapping -> mapping.getKey().equals(mapping.getValue()));
+	}
+
+	private static class StartupOptions {
+		private StartupMode startupMode;
+		private Map<KafkaTopicPartition, Long> specificOffsets;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
index 6e83ddd..7e0d1fb 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
@@ -74,4 +74,14 @@ public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
 
 		return partitions[parallelInstanceId % partitions.length];
 	}
+
+	@Override
+	public boolean equals(Object o) {
+		return this == o || o instanceof FlinkFixedPartitioner;
+	}
+
+	@Override
+	public int hashCode() {
+		return FlinkFixedPartitioner.class.hashCode();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
index a87c622..946b6eb5 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -44,7 +44,11 @@ import static org.mockito.Mockito.when;
 
 /**
  * Abstract test base for all Kafka table sink tests.
+ *
+ * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
+ *             drop support for format-specific table sinks.
  */
+@Deprecated
 public abstract class KafkaTableSinkTestBase {
 
 	private static final String TOPIC = "testTopic";
@@ -94,7 +98,8 @@ public abstract class KafkaTableSinkTestBase {
 	protected abstract Class<? extends FlinkKafkaProducerBase> getProducerClass();
 
 	private KafkaTableSink createTableSink() {
-		return createTableSink(TOPIC, PROPERTIES, PARTITIONER);
+		KafkaTableSink sink = createTableSink(TOPIC, PROPERTIES, PARTITIONER);
+		return sink.configure(FIELD_NAMES, FIELD_TYPES);
 	}
 
 	private static Properties createSinkProperties() {

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactoryTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactoryTestBase.java
deleted file mode 100644
index 96f1607..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactoryTestBase.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.Types;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.descriptors.Kafka;
-import org.apache.flink.table.descriptors.Rowtime;
-import org.apache.flink.table.descriptors.Schema;
-import org.apache.flink.table.descriptors.TestTableDescriptor;
-import org.apache.flink.table.factories.StreamTableSourceFactory;
-import org.apache.flink.table.factories.TableFactoryService;
-import org.apache.flink.table.factories.utils.TestDeserializationSchema;
-import org.apache.flink.table.factories.utils.TestTableFormat;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.table.sources.TableSource;
-import org.apache.flink.table.sources.tsextractors.ExistingField;
-import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Abstract test base for {@link KafkaTableSourceFactory}.
- */
-public abstract class KafkaTableSourceFactoryTestBase extends TestLogger {
-
-	private static final String TOPIC = "myTopic";
-	private static final int PARTITION_0 = 0;
-	private static final long OFFSET_0 = 100L;
-	private static final int PARTITION_1 = 1;
-	private static final long OFFSET_1 = 123L;
-	private static final String FRUIT_NAME = "fruit-name";
-	private static final String NAME = "name";
-	private static final String COUNT = "count";
-	private static final String TIME = "time";
-	private static final String EVENT_TIME = "event-time";
-	private static final String PROC_TIME = "proc-time";
-	private static final Properties KAFKA_PROPERTIES = new Properties();
-	static {
-		KAFKA_PROPERTIES.setProperty("zookeeper.connect", "dummy");
-		KAFKA_PROPERTIES.setProperty("group.id", "dummy");
-	}
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testTableSource() {
-
-		// prepare parameters for Kafka table source
-
-		final TableSchema schema = TableSchema.builder()
-			.field(FRUIT_NAME, Types.STRING())
-			.field(COUNT, Types.DECIMAL())
-			.field(EVENT_TIME, Types.SQL_TIMESTAMP())
-			.field(PROC_TIME, Types.SQL_TIMESTAMP())
-			.build();
-
-		final List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors = Collections.singletonList(
-			new RowtimeAttributeDescriptor(EVENT_TIME, new ExistingField(TIME), new AscendingTimestamps()));
-
-		final Map<String, String> fieldMapping = new HashMap<>();
-		fieldMapping.put(FRUIT_NAME, NAME);
-		fieldMapping.put(COUNT, COUNT);
-
-		final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
-		specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_0), OFFSET_0);
-		specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_1), OFFSET_1);
-
-		final TestDeserializationSchema deserializationSchema = new TestDeserializationSchema(
-			TableSchema.builder()
-				.field(NAME, Types.STRING())
-				.field(COUNT, Types.DECIMAL())
-				.field(TIME, Types.SQL_TIMESTAMP())
-				.build()
-				.toRowType()
-		);
-
-		final StartupMode startupMode = StartupMode.SPECIFIC_OFFSETS;
-
-		final KafkaTableSource expected = getExpectedKafkaTableSource(
-			schema,
-			Optional.of(PROC_TIME),
-			rowtimeAttributeDescriptors,
-			fieldMapping,
-			TOPIC,
-			KAFKA_PROPERTIES,
-			deserializationSchema,
-			startupMode,
-			specificOffsets);
-
-		// construct table source using descriptors and table source factory
-
-		final Map<Integer, Long> offsets = new HashMap<>();
-		offsets.put(PARTITION_0, OFFSET_0);
-		offsets.put(PARTITION_1, OFFSET_1);
-
-		final TestTableDescriptor testDesc = new TestTableDescriptor(
-				new Kafka()
-					.version(getKafkaVersion())
-					.topic(TOPIC)
-					.properties(KAFKA_PROPERTIES)
-					.startFromSpecificOffsets(offsets))
-			.withFormat(new TestTableFormat())
-			.withSchema(
-				new Schema()
-					.field(FRUIT_NAME, Types.STRING()).from(NAME)
-					.field(COUNT, Types.DECIMAL()) // no from so it must match with the input
-					.field(EVENT_TIME, Types.SQL_TIMESTAMP()).rowtime(
-						new Rowtime().timestampsFromField(TIME).watermarksPeriodicAscending())
-					.field(PROC_TIME, Types.SQL_TIMESTAMP()).proctime())
-			.inAppendMode();
-		final DescriptorProperties descriptorProperties = new DescriptorProperties(true);
-		testDesc.addProperties(descriptorProperties);
-		final Map<String, String> propertiesMap = descriptorProperties.asMap();
-
-		final TableSource<?> actualSource = TableFactoryService.find(StreamTableSourceFactory.class, testDesc)
-			.createStreamTableSource(propertiesMap);
-
-		assertEquals(expected, actualSource);
-
-		// test Kafka consumer
-		final KafkaTableSource actualKafkaSource = (KafkaTableSource) actualSource;
-		final StreamExecutionEnvironmentMock mock = new StreamExecutionEnvironmentMock();
-		actualKafkaSource.getDataStream(mock);
-		assertTrue(getExpectedFlinkKafkaConsumer().isAssignableFrom(mock.function.getClass()));
-	}
-
-	private static class StreamExecutionEnvironmentMock extends StreamExecutionEnvironment {
-
-		public SourceFunction<?> function;
-
-		@Override
-		public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function) {
-			this.function = function;
-			return super.addSource(function);
-		}
-
-		@Override
-		public JobExecutionResult execute(String jobName) {
-			throw new UnsupportedOperationException();
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// For version-specific tests
-	// --------------------------------------------------------------------------------------------
-
-	protected abstract String getKafkaVersion();
-
-	protected abstract Class<FlinkKafkaConsumerBase<Row>> getExpectedFlinkKafkaConsumer();
-
-	protected abstract KafkaTableSource getExpectedKafkaTableSource(
-		TableSchema schema,
-		Optional<String> proctimeAttribute,
-		List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
-		Map<String, String> fieldMapping,
-		String topic, Properties properties,
-		DeserializationSchema<Row> deserializationSchema,
-		StartupMode startupMode,
-		Map<KafkaTopicPartition, Long> specificStartupOffsets);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
new file mode 100644
index 0000000..d8e8f7d
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.Kafka;
+import org.apache.flink.table.descriptors.Rowtime;
+import org.apache.flink.table.descriptors.Schema;
+import org.apache.flink.table.descriptors.TestTableDescriptor;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.factories.TableFactoryService;
+import org.apache.flink.table.factories.utils.TestDeserializationSchema;
+import org.apache.flink.table.factories.utils.TestSerializationSchema;
+import org.apache.flink.table.factories.utils.TestTableFormat;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.sources.tsextractors.ExistingField;
+import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Abstract test base for {@link KafkaTableSourceSinkFactoryBase}.
+ */
+public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger {
+
+	private static final String TOPIC = "myTopic";
+	private static final int PARTITION_0 = 0;
+	private static final long OFFSET_0 = 100L;
+	private static final int PARTITION_1 = 1;
+	private static final long OFFSET_1 = 123L;
+	private static final String FRUIT_NAME = "fruit-name";
+	private static final String NAME = "name";
+	private static final String COUNT = "count";
+	private static final String TIME = "time";
+	private static final String EVENT_TIME = "event-time";
+	private static final String PROC_TIME = "proc-time";
+	private static final Properties KAFKA_PROPERTIES = new Properties();
+	static {
+		KAFKA_PROPERTIES.setProperty("zookeeper.connect", "dummy");
+		KAFKA_PROPERTIES.setProperty("group.id", "dummy");
+		KAFKA_PROPERTIES.setProperty("bootstrap.servers", "dummy");
+	}
+
+	private static final Map<Integer, Long> OFFSETS = new HashMap<>();
+	static {
+		OFFSETS.put(PARTITION_0, OFFSET_0);
+		OFFSETS.put(PARTITION_1, OFFSET_1);
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testTableSource() {
+
+		// prepare parameters for Kafka table source
+
+		final TableSchema schema = TableSchema.builder()
+			.field(FRUIT_NAME, Types.STRING())
+			.field(COUNT, Types.DECIMAL())
+			.field(EVENT_TIME, Types.SQL_TIMESTAMP())
+			.field(PROC_TIME, Types.SQL_TIMESTAMP())
+			.build();
+
+		final List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors = Collections.singletonList(
+			new RowtimeAttributeDescriptor(EVENT_TIME, new ExistingField(TIME), new AscendingTimestamps()));
+
+		final Map<String, String> fieldMapping = new HashMap<>();
+		fieldMapping.put(FRUIT_NAME, NAME);
+		fieldMapping.put(COUNT, COUNT);
+
+		final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
+		specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_0), OFFSET_0);
+		specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_1), OFFSET_1);
+
+		final TestDeserializationSchema deserializationSchema = new TestDeserializationSchema(
+			TableSchema.builder()
+				.field(NAME, Types.STRING())
+				.field(COUNT, Types.DECIMAL())
+				.field(TIME, Types.SQL_TIMESTAMP())
+				.build()
+				.toRowType()
+		);
+
+		final KafkaTableSource expected = getExpectedKafkaTableSource(
+			schema,
+			Optional.of(PROC_TIME),
+			rowtimeAttributeDescriptors,
+			fieldMapping,
+			TOPIC,
+			KAFKA_PROPERTIES,
+			deserializationSchema,
+			StartupMode.SPECIFIC_OFFSETS,
+			specificOffsets);
+
+		// construct table source using descriptors and table source factory
+
+		final TestTableDescriptor testDesc = new TestTableDescriptor(
+				new Kafka()
+					.version(getKafkaVersion())
+					.topic(TOPIC)
+					.properties(KAFKA_PROPERTIES)
+					.startFromSpecificOffsets(OFFSETS))
+			.withFormat(new TestTableFormat())
+			.withSchema(
+				new Schema()
+					.field(FRUIT_NAME, Types.STRING()).from(NAME)
+					.field(COUNT, Types.DECIMAL()) // no from so it must match with the input
+					.field(EVENT_TIME, Types.SQL_TIMESTAMP()).rowtime(
+						new Rowtime().timestampsFromField(TIME).watermarksPeriodicAscending())
+					.field(PROC_TIME, Types.SQL_TIMESTAMP()).proctime())
+			.inAppendMode();
+
+		final Map<String, String> propertiesMap = DescriptorProperties.toJavaMap(testDesc);
+		final TableSource<?> actualSource = TableFactoryService.find(StreamTableSourceFactory.class, propertiesMap)
+			.createStreamTableSource(propertiesMap);
+
+		assertEquals(expected, actualSource);
+
+		// test Kafka consumer
+		final KafkaTableSource actualKafkaSource = (KafkaTableSource) actualSource;
+		final StreamExecutionEnvironmentMock mock = new StreamExecutionEnvironmentMock();
+		actualKafkaSource.getDataStream(mock);
+		assertTrue(getExpectedFlinkKafkaConsumer().isAssignableFrom(mock.sourceFunction.getClass()));
+	}
+
+	/**
+	 * This test can be unified with the corresponding source test once we have fixed FLINK-9870.
+	 */
+	@Test
+	public void testTableSink() {
+		// prepare parameters for Kafka table sink
+
+		final TableSchema schema = TableSchema.builder()
+			.field(FRUIT_NAME, Types.STRING())
+			.field(COUNT, Types.DECIMAL())
+			.field(EVENT_TIME, Types.SQL_TIMESTAMP())
+			.build();
+
+		final KafkaTableSink expected = getExpectedKafkaTableSink(
+			schema,
+			TOPIC,
+			KAFKA_PROPERTIES,
+			new FlinkFixedPartitioner<>(), // a custom partitioner is not support yet
+			new TestSerializationSchema(schema.toRowType()));
+
+		// construct table sink using descriptors and table sink factory
+
+		final TestTableDescriptor testDesc = new TestTableDescriptor(
+				new Kafka()
+					.version(getKafkaVersion())
+					.topic(TOPIC)
+					.properties(KAFKA_PROPERTIES)
+					.startFromSpecificOffsets(OFFSETS)) // test if they accepted although not needed
+			.withFormat(new TestTableFormat())
+			.withSchema(
+				new Schema()
+					.field(FRUIT_NAME, Types.STRING())
+					.field(COUNT, Types.DECIMAL())
+					.field(EVENT_TIME, Types.SQL_TIMESTAMP()))
+			.inAppendMode();
+
+		final Map<String, String> propertiesMap = DescriptorProperties.toJavaMap(testDesc);
+		final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
+			.createStreamTableSink(propertiesMap);
+
+		assertEquals(expected, actualSink);
+
+		// test Kafka producer
+		final KafkaTableSink actualKafkaSink = (KafkaTableSink) actualSink;
+		final DataStreamMock streamMock = new DataStreamMock(new StreamExecutionEnvironmentMock(), schema.toRowType());
+		actualKafkaSink.emitDataStream(streamMock);
+		assertTrue(getExpectedFlinkKafkaProducer().isAssignableFrom(streamMock.sinkFunction.getClass()));
+	}
+
+	private static class StreamExecutionEnvironmentMock extends StreamExecutionEnvironment {
+
+		public SourceFunction<?> sourceFunction;
+
+		@Override
+		public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> sourceFunction) {
+			this.sourceFunction = sourceFunction;
+			return super.addSource(sourceFunction);
+		}
+
+		@Override
+		public JobExecutionResult execute(String jobName) {
+			throw new UnsupportedOperationException();
+		}
+	}
+
+	private static class DataStreamMock extends DataStream<Row> {
+
+		public SinkFunction<?> sinkFunction;
+
+		public DataStreamMock(StreamExecutionEnvironment environment, TypeInformation<Row> outType) {
+			super(environment, new StreamTransformationMock("name", outType, 1));
+		}
+
+		@Override
+		public DataStreamSink<Row> addSink(SinkFunction<Row> sinkFunction) {
+			this.sinkFunction = sinkFunction;
+			return super.addSink(sinkFunction);
+		}
+	}
+
+	private static class StreamTransformationMock extends StreamTransformation<Row> {
+
+		public StreamTransformationMock(String name, TypeInformation<Row> outputType, int parallelism) {
+			super(name, outputType, parallelism);
+		}
+
+		@Override
+		public void setChainingStrategy(ChainingStrategy strategy) {
+			// do nothing
+		}
+
+		@Override
+		public Collection<StreamTransformation<?>> getTransitivePredecessors() {
+			return null;
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// For version-specific tests
+	// --------------------------------------------------------------------------------------------
+
+	protected abstract String getKafkaVersion();
+
+	protected abstract Class<FlinkKafkaConsumerBase<Row>> getExpectedFlinkKafkaConsumer();
+
+	protected abstract Class<?> getExpectedFlinkKafkaProducer();
+
+	protected abstract KafkaTableSource getExpectedKafkaTableSource(
+		TableSchema schema,
+		Optional<String> proctimeAttribute,
+		List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+		Map<String, String> fieldMapping,
+		String topic,
+		Properties properties,
+		DeserializationSchema<Row> deserializationSchema,
+		StartupMode startupMode,
+		Map<KafkaTopicPartition, Long> specificStartupOffsets);
+
+	protected abstract KafkaTableSink getExpectedKafkaTableSink(
+		TableSchema schema,
+		String topic,
+		Properties properties,
+		FlinkKafkaPartitioner<Row> partitioner,
+		SerializationSchema<Row> serializationSchema);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestSerializationSchema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestSerializationSchema.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestSerializationSchema.scala
index ab613a9..a7eaa48 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestSerializationSchema.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestSerializationSchema.scala
@@ -19,12 +19,26 @@
 package org.apache.flink.table.factories.utils
 
 import org.apache.flink.api.common.serialization.SerializationSchema
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.types.Row
 
 /**
   * Serialization schema for testing purposes.
   */
-class TestSerializationSchema extends SerializationSchema[Row] {
+class TestSerializationSchema(val typeInfo: TypeInformation[Row]) extends SerializationSchema[Row] {
 
   override def serialize(element: Row): Array[Byte] = throw new UnsupportedOperationException()
+
+  def canEqual(other: Any): Boolean = other.isInstanceOf[TestSerializationSchema]
+
+  override def equals(other: Any): Boolean = other match {
+    case that: TestSerializationSchema =>
+      (that canEqual this) &&
+        typeInfo == that.typeInfo
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    31 * typeInfo.hashCode()
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableFormatFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableFormatFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableFormatFactory.scala
index 475cff9..39c268e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableFormatFactory.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableFormatFactory.scala
@@ -20,9 +20,9 @@ package org.apache.flink.table.factories.utils
 
 import java.util
 
-import org.apache.flink.api.common.serialization.DeserializationSchema
+import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema}
 import org.apache.flink.table.descriptors.{DescriptorProperties, FormatDescriptorValidator, SchemaValidator}
-import org.apache.flink.table.factories.{DeserializationSchemaFactory, TableFormatFactoryServiceTest}
+import org.apache.flink.table.factories.{DeserializationSchemaFactory, SerializationSchemaFactory, TableFormatFactoryServiceTest}
 import org.apache.flink.types.Row
 
 /**
@@ -31,7 +31,9 @@ import org.apache.flink.types.Row
   * It has the same context as [[TestAmbiguousTableFormatFactory]] and both support COMMON_PATH.
   * This format does not support SPECIAL_PATH but supports schema derivation.
   */
-class TestTableFormatFactory extends DeserializationSchemaFactory[Row] {
+class TestTableFormatFactory
+  extends DeserializationSchemaFactory[Row]
+  with SerializationSchemaFactory[Row] {
 
   override def requiredContext(): util.Map[String, String] = {
     val context = new util.HashMap[String, String]()
@@ -62,4 +64,14 @@ class TestTableFormatFactory extends DeserializationSchemaFactory[Row] {
     val schema = SchemaValidator.deriveFormatFields(props)
     new TestDeserializationSchema(schema.toRowType)
   }
+
+  override def createSerializationSchema(
+      properties: util.Map[String, String])
+    : SerializationSchema[Row] = {
+
+    val props = new DescriptorProperties(true)
+    props.putProperties(properties)
+    val schema = SchemaValidator.deriveFormatFields(props)
+    new TestSerializationSchema(schema.toRowType)
+  }
 }


[2/2] flink git commit: [FLINK-9846] [table] Add a Kafka table sink factory

Posted by tw...@apache.org.
[FLINK-9846] [table] Add a Kafka table sink factory

Adds a Kafka table sink factory with format discovery. Currently, this enables
the SQL Client to write Avro and JSON data to Kafka. The functionality is
limited due to FLINK-9870. Therefore, it is currently not possible
to use time attributes in the output.

Changes:
- Decouple Kafka sink from formats and deprecate old classes
- Add a Kafka table sink factory
- Existing tests for the KafkaTableSourceFactory have been
  generalized to support sinks as well.

This closes #6387.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/57b3cde8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/57b3cde8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/57b3cde8

Branch: refs/heads/master
Commit: 57b3cde863922094be4f395063317e42349aedb3
Parents: 9e348d3
Author: Timo Walther <tw...@apache.org>
Authored: Mon Jul 23 08:12:00 2018 +0200
Committer: Timo Walther <tw...@apache.org>
Committed: Mon Jul 23 18:17:28 2018 +0200

----------------------------------------------------------------------
 .../connectors/kafka/Kafka010JsonTableSink.java |  19 +-
 .../connectors/kafka/Kafka010TableSink.java     |  61 ++++
 .../kafka/Kafka010TableSourceFactory.java       |  72 ----
 .../kafka/Kafka010TableSourceSinkFactory.java   |  90 +++++
 ...rg.apache.flink.table.factories.TableFactory |   2 +-
 .../kafka/Kafka010JsonTableSinkTest.java        |   4 +
 .../kafka/Kafka010TableSourceFactoryTest.java   |  74 -----
 .../Kafka010TableSourceSinkFactoryTest.java     |  99 ++++++
 .../connectors/kafka/Kafka011TableSink.java     |  64 ++++
 .../connectors/kafka/Kafka011TableSource.java   |   3 +-
 .../kafka/Kafka011TableSourceFactory.java       |  72 ----
 .../kafka/Kafka011TableSourceSinkFactory.java   |  90 +++++
 ...rg.apache.flink.table.factories.TableFactory |   2 +-
 .../kafka/Kafka011TableSourceFactoryTest.java   |  74 -----
 .../Kafka011TableSourceSinkFactoryTest.java     |  99 ++++++
 .../connectors/kafka/Kafka08JsonTableSink.java  |  19 +-
 .../connectors/kafka/Kafka08TableSink.java      |  61 ++++
 .../connectors/kafka/Kafka08TableSource.java    |   3 +-
 .../kafka/Kafka08TableSourceFactory.java        |  72 ----
 .../kafka/Kafka08TableSourceSinkFactory.java    |  90 +++++
 ...rg.apache.flink.table.factories.TableFactory |   2 +-
 .../kafka/Kafka08JsonTableSinkTest.java         |   4 +
 .../kafka/Kafka08TableSourceFactoryTest.java    |  74 -----
 .../Kafka08TableSourceSinkFactoryTest.java      |  99 ++++++
 .../connectors/kafka/Kafka09JsonTableSink.java  |  19 +-
 .../connectors/kafka/Kafka09TableSink.java      |  61 ++++
 .../connectors/kafka/Kafka09TableSource.java    |   3 +-
 .../kafka/Kafka09TableSourceFactory.java        |  72 ----
 .../kafka/Kafka09TableSourceSinkFactory.java    |  90 +++++
 ...rg.apache.flink.table.factories.TableFactory |   2 +-
 .../kafka/Kafka09JsonTableSinkTest.java         |   4 +
 .../kafka/Kafka09TableSourceFactoryTest.java    |  74 -----
 .../Kafka09TableSourceSinkFactoryTest.java      |  99 ++++++
 .../connectors/kafka/KafkaJsonTableSink.java    |   5 +
 .../connectors/kafka/KafkaTableSink.java        | 112 ++++++-
 .../kafka/KafkaTableSourceFactory.java          | 251 --------------
 .../kafka/KafkaTableSourceSinkFactoryBase.java  | 330 +++++++++++++++++++
 .../partitioner/FlinkFixedPartitioner.java      |  10 +
 .../kafka/KafkaTableSinkTestBase.java           |   7 +-
 .../kafka/KafkaTableSourceFactoryTestBase.java  | 196 -----------
 .../KafkaTableSourceSinkFactoryTestBase.java    | 299 +++++++++++++++++
 .../utils/TestSerializationSchema.scala         |  16 +-
 .../utils/TestTableFormatFactory.scala          |  18 +-
 43 files changed, 1852 insertions(+), 1065 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
index ef33cd5..2ad3142 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
@@ -18,18 +18,23 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.types.Row;
 
 import java.util.Properties;
 
 /**
  * Kafka 0.10 {@link KafkaTableSink} that serializes data in JSON format.
+ *
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ *             with descriptors for schema and format instead. Descriptors allow for
+ *             implementation-agnostic definition of tables. See also
+ *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
  */
-@PublicEvolving
+@Deprecated
 public class Kafka010JsonTableSink extends KafkaJsonTableSink {
 
 	/**
@@ -46,7 +51,9 @@ public class Kafka010JsonTableSink extends KafkaJsonTableSink {
 	 *
 	 * @param topic topic in Kafka to which table is written
 	 * @param properties properties to connect to Kafka
+	 * @deprecated Use table descriptors instead of implementation-specific classes.
 	 */
+	@Deprecated
 	public Kafka010JsonTableSink(String topic, Properties properties) {
 		super(topic, properties, new FlinkFixedPartitioner<>());
 	}
@@ -58,14 +65,20 @@ public class Kafka010JsonTableSink extends KafkaJsonTableSink {
 	 * @param topic topic in Kafka to which table is written
 	 * @param properties properties to connect to Kafka
 	 * @param partitioner Kafka partitioner
+	 * @deprecated Use table descriptors instead of implementation-specific classes.
 	 */
+	@Deprecated
 	public Kafka010JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
 		super(topic, properties, partitioner);
 	}
 
 	@Override
 	protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
-		return new FlinkKafkaProducer010<>(topic, serializationSchema, properties, partitioner);
+		return new FlinkKafkaProducer010<>(
+			topic,
+			serializationSchema,
+			properties,
+			partitioner);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java
new file mode 100644
index 0000000..a8c6553
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import java.util.Properties;
+
+/**
+ * Kafka 0.10 table sink for writing data into Kafka.
+ */
+@Internal
+public class Kafka010TableSink extends KafkaTableSink {
+
+	public Kafka010TableSink(
+			TableSchema schema,
+			String topic,
+			Properties properties,
+			FlinkKafkaPartitioner<Row> partitioner,
+			SerializationSchema<Row> serializationSchema) {
+		super(
+			schema,
+			topic,
+			properties,
+			partitioner,
+			serializationSchema);
+	}
+
+	@Override
+	protected FlinkKafkaProducerBase<Row> createKafkaProducer(
+			String topic,
+			Properties properties,
+			SerializationSchema<Row> serializationSchema,
+			FlinkKafkaPartitioner<Row> partitioner) {
+		return new FlinkKafkaProducer010<>(
+			topic,
+			serializationSchema,
+			properties,
+			partitioner);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactory.java
deleted file mode 100644
index 4a86016..0000000
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactory.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.KafkaValidator;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.types.Row;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-/**
- * Factory for creating configured instances of {@link Kafka010TableSource}.
- */
-public class Kafka010TableSourceFactory extends KafkaTableSourceFactory {
-
-	@Override
-	protected String kafkaVersion() {
-		return KafkaValidator.CONNECTOR_VERSION_VALUE_010;
-	}
-
-	@Override
-	protected boolean supportsKafkaTimestamps() {
-		return true;
-	}
-
-	@Override
-	protected KafkaTableSource createKafkaTableSource(
-			TableSchema schema,
-			Optional<String> proctimeAttribute,
-			List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
-			Map<String, String> fieldMapping,
-			String topic,
-			Properties properties,
-			DeserializationSchema<Row> deserializationSchema,
-			StartupMode startupMode,
-			Map<KafkaTopicPartition, Long> specificStartupOffsets) {
-
-		return new Kafka010TableSource(
-			schema,
-			proctimeAttribute,
-			rowtimeAttributeDescriptors,
-			Optional.of(fieldMapping),
-			topic,
-			properties,
-			deserializationSchema,
-			startupMode,
-			specificStartupOffsets);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java
new file mode 100644
index 0000000..0cf9499
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Factory for creating configured instances of {@link Kafka010TableSource}.
+ */
+public class Kafka010TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase {
+
+	@Override
+	protected String kafkaVersion() {
+		return KafkaValidator.CONNECTOR_VERSION_VALUE_010;
+	}
+
+	@Override
+	protected boolean supportsKafkaTimestamps() {
+		return true;
+	}
+
+	@Override
+	protected KafkaTableSource createKafkaTableSource(
+			TableSchema schema,
+			Optional<String> proctimeAttribute,
+			List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+			Map<String, String> fieldMapping,
+			String topic,
+			Properties properties,
+			DeserializationSchema<Row> deserializationSchema,
+			StartupMode startupMode,
+			Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+		return new Kafka010TableSource(
+			schema,
+			proctimeAttribute,
+			rowtimeAttributeDescriptors,
+			Optional.of(fieldMapping),
+			topic,
+			properties,
+			deserializationSchema,
+			startupMode,
+			specificStartupOffsets);
+	}
+
+	@Override
+	protected KafkaTableSink createKafkaTableSink(
+			TableSchema schema,
+			String topic,
+			Properties properties,
+			FlinkKafkaPartitioner<Row> partitioner,
+			SerializationSchema<Row> serializationSchema) {
+
+		return new Kafka010TableSink(
+			schema,
+			topic,
+			properties,
+			partitioner,
+			serializationSchema);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index 21f5707..9bb0363 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceFactory
+org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
index af562c6..339420c 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
@@ -27,7 +27,11 @@ import java.util.Properties;
 
 /**
  * Tests for the {@link Kafka010JsonTableSink}.
+ *
+ * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
+ *             drop support for format-specific table sinks.
  */
+@Deprecated
 public class Kafka010JsonTableSinkTest extends KafkaTableSinkTestBase {
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java
deleted file mode 100644
index ff3b0b0..0000000
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.KafkaValidator;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.types.Row;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-/**
- * Test for {@link Kafka010TableSource} created by {@link Kafka010TableSourceFactory}.
- */
-public class Kafka010TableSourceFactoryTest extends KafkaTableSourceFactoryTestBase {
-
-	@Override
-	protected String getKafkaVersion() {
-		return KafkaValidator.CONNECTOR_VERSION_VALUE_010;
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	protected Class<FlinkKafkaConsumerBase<Row>> getExpectedFlinkKafkaConsumer() {
-		return (Class) FlinkKafkaConsumer010.class;
-	}
-
-	@Override
-	protected KafkaTableSource getExpectedKafkaTableSource(
-			TableSchema schema,
-			Optional<String> proctimeAttribute,
-			List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
-			Map<String, String> fieldMapping,
-			String topic,
-			Properties properties,
-			DeserializationSchema<Row> deserializationSchema,
-			StartupMode startupMode,
-			Map<KafkaTopicPartition, Long> specificStartupOffsets) {
-
-		return new Kafka010TableSource(
-			schema,
-			proctimeAttribute,
-			rowtimeAttributeDescriptors,
-			Optional.of(fieldMapping),
-			topic,
-			properties,
-			deserializationSchema,
-			startupMode,
-			specificStartupOffsets
-		);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java
new file mode 100644
index 0000000..cc198c9
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Test for {@link Kafka010TableSource} and {@link Kafka010TableSink} created
+ * by {@link Kafka010TableSourceSinkFactory}.
+ */
+public class Kafka010TableSourceSinkFactoryTest extends KafkaTableSourceSinkFactoryTestBase {
+
+	@Override
+	protected String getKafkaVersion() {
+		return KafkaValidator.CONNECTOR_VERSION_VALUE_010;
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	protected Class<FlinkKafkaConsumerBase<Row>> getExpectedFlinkKafkaConsumer() {
+		return (Class) FlinkKafkaConsumer010.class;
+	}
+
+	@Override
+	protected Class<?> getExpectedFlinkKafkaProducer() {
+		return FlinkKafkaProducer010.class;
+	}
+
+	@Override
+	protected KafkaTableSource getExpectedKafkaTableSource(
+			TableSchema schema,
+			Optional<String> proctimeAttribute,
+			List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+			Map<String, String> fieldMapping,
+			String topic,
+			Properties properties,
+			DeserializationSchema<Row> deserializationSchema,
+			StartupMode startupMode,
+			Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+		return new Kafka010TableSource(
+			schema,
+			proctimeAttribute,
+			rowtimeAttributeDescriptors,
+			Optional.of(fieldMapping),
+			topic,
+			properties,
+			deserializationSchema,
+			startupMode,
+			specificStartupOffsets
+		);
+	}
+
+	@Override
+	protected KafkaTableSink getExpectedKafkaTableSink(
+			TableSchema schema,
+			String topic,
+			Properties properties,
+			FlinkKafkaPartitioner<Row> partitioner,
+			SerializationSchema<Row> serializationSchema) {
+
+		return new Kafka010TableSink(
+			schema,
+			topic,
+			properties,
+			partitioner,
+			serializationSchema
+		);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
new file mode 100644
index 0000000..22c6da1
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Kafka 0.11 table sink for writing data into Kafka.
+ */
+@Internal
+public class Kafka011TableSink extends KafkaTableSink {
+
+	public Kafka011TableSink(
+			TableSchema schema,
+			String topic,
+			Properties properties,
+			FlinkKafkaPartitioner<Row> partitioner,
+			SerializationSchema<Row> serializationSchema) {
+		super(
+			schema,
+			topic,
+			properties,
+			partitioner,
+			serializationSchema);
+	}
+
+	@Override
+	protected SinkFunction<Row> createKafkaProducer(
+			String topic,
+			Properties properties,
+			SerializationSchema<Row> serializationSchema,
+			FlinkKafkaPartitioner<Row> partitioner) {
+		return new FlinkKafkaProducer011<>(
+			topic,
+			new KeyedSerializationSchemaWrapper<>(serializationSchema),
+			properties,
+			Optional.of(partitioner));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
index 85f5669..a646317 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
@@ -58,7 +58,8 @@ public class Kafka011TableSource extends KafkaTableSource {
 			Optional<String> proctimeAttribute,
 			List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
 			Optional<Map<String, String>> fieldMapping,
-			String topic, Properties properties,
+			String topic,
+			Properties properties,
 			DeserializationSchema<Row> deserializationSchema,
 			StartupMode startupMode,
 			Map<KafkaTopicPartition, Long> specificStartupOffsets) {

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactory.java
deleted file mode 100644
index b1e3929..0000000
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactory.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.KafkaValidator;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.types.Row;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-/**
- * Factory for creating configured instances of {@link Kafka011TableSource}.
- */
-public class Kafka011TableSourceFactory extends KafkaTableSourceFactory {
-
-	@Override
-	protected String kafkaVersion() {
-		return KafkaValidator.CONNECTOR_VERSION_VALUE_011;
-	}
-
-	@Override
-	protected boolean supportsKafkaTimestamps() {
-		return true;
-	}
-
-	@Override
-	protected KafkaTableSource createKafkaTableSource(
-			TableSchema schema,
-			Optional<String> proctimeAttribute,
-			List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
-			Map<String, String> fieldMapping,
-			String topic,
-			Properties properties,
-			DeserializationSchema<Row> deserializationSchema,
-			StartupMode startupMode,
-			Map<KafkaTopicPartition, Long> specificStartupOffsets) {
-
-		return new Kafka011TableSource(
-			schema,
-			proctimeAttribute,
-			rowtimeAttributeDescriptors,
-			Optional.of(fieldMapping),
-			topic,
-			properties,
-			deserializationSchema,
-			startupMode,
-			specificStartupOffsets);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java
new file mode 100644
index 0000000..c26df42
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Factory for creating configured instances of {@link Kafka011TableSource}.
+ */
+public class Kafka011TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase {
+
+	@Override
+	protected String kafkaVersion() {
+		return KafkaValidator.CONNECTOR_VERSION_VALUE_011;
+	}
+
+	@Override
+	protected boolean supportsKafkaTimestamps() {
+		return true;
+	}
+
+	@Override
+	protected KafkaTableSource createKafkaTableSource(
+			TableSchema schema,
+			Optional<String> proctimeAttribute,
+			List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+			Map<String, String> fieldMapping,
+			String topic,
+			Properties properties,
+			DeserializationSchema<Row> deserializationSchema,
+			StartupMode startupMode,
+			Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+		return new Kafka011TableSource(
+			schema,
+			proctimeAttribute,
+			rowtimeAttributeDescriptors,
+			Optional.of(fieldMapping),
+			topic,
+			properties,
+			deserializationSchema,
+			startupMode,
+			specificStartupOffsets);
+	}
+
+	@Override
+	protected KafkaTableSink createKafkaTableSink(
+			TableSchema schema,
+			String topic,
+			Properties properties,
+			FlinkKafkaPartitioner<Row> partitioner,
+			SerializationSchema<Row> serializationSchema) {
+
+		return new Kafka011TableSink(
+			schema,
+			topic,
+			properties,
+			partitioner,
+			serializationSchema);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index c056097..b59b4a7 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceFactory
+org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceSinkFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java
deleted file mode 100644
index abaa490..0000000
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.KafkaValidator;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.types.Row;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-/**
- * Test for {@link Kafka011TableSource} created by {@link Kafka011TableSourceFactory}.
- */
-public class Kafka011TableSourceFactoryTest extends KafkaTableSourceFactoryTestBase {
-
-	@Override
-	protected String getKafkaVersion() {
-		return KafkaValidator.CONNECTOR_VERSION_VALUE_011;
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	protected Class<FlinkKafkaConsumerBase<Row>> getExpectedFlinkKafkaConsumer() {
-		return (Class) FlinkKafkaConsumer011.class;
-	}
-
-	@Override
-	protected KafkaTableSource getExpectedKafkaTableSource(
-			TableSchema schema,
-			Optional<String> proctimeAttribute,
-			List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
-			Map<String, String> fieldMapping,
-			String topic,
-			Properties properties,
-			DeserializationSchema<Row> deserializationSchema,
-			StartupMode startupMode,
-			Map<KafkaTopicPartition, Long> specificStartupOffsets) {
-
-		return new Kafka011TableSource(
-			schema,
-			proctimeAttribute,
-			rowtimeAttributeDescriptors,
-			Optional.of(fieldMapping),
-			topic,
-			properties,
-			deserializationSchema,
-			startupMode,
-			specificStartupOffsets
-		);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java
new file mode 100644
index 0000000..996c508
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Test for {@link Kafka011TableSource} and {@link Kafka011TableSink} created
+ * by {@link Kafka011TableSourceSinkFactory}.
+ */
+public class Kafka011TableSourceSinkFactoryTest extends KafkaTableSourceSinkFactoryTestBase {
+
+	@Override
+	protected String getKafkaVersion() {
+		return KafkaValidator.CONNECTOR_VERSION_VALUE_011;
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	protected Class<FlinkKafkaConsumerBase<Row>> getExpectedFlinkKafkaConsumer() {
+		return (Class) FlinkKafkaConsumer011.class;
+	}
+
+	@Override
+	protected Class<?> getExpectedFlinkKafkaProducer() {
+		return FlinkKafkaProducer011.class;
+	}
+
+	@Override
+	protected KafkaTableSource getExpectedKafkaTableSource(
+			TableSchema schema,
+			Optional<String> proctimeAttribute,
+			List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+			Map<String, String> fieldMapping,
+			String topic,
+			Properties properties,
+			DeserializationSchema<Row> deserializationSchema,
+			StartupMode startupMode,
+			Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+		return new Kafka011TableSource(
+			schema,
+			proctimeAttribute,
+			rowtimeAttributeDescriptors,
+			Optional.of(fieldMapping),
+			topic,
+			properties,
+			deserializationSchema,
+			startupMode,
+			specificStartupOffsets
+		);
+	}
+
+	@Override
+	protected KafkaTableSink getExpectedKafkaTableSink(
+			TableSchema schema,
+			String topic,
+			Properties properties,
+			FlinkKafkaPartitioner<Row> partitioner,
+			SerializationSchema<Row> serializationSchema) {
+
+		return new Kafka011TableSink(
+			schema,
+			topic,
+			properties,
+			partitioner,
+			serializationSchema
+		);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
index c60288d..45588cd 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
@@ -18,20 +18,25 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.types.Row;
 
 import java.util.Properties;
 
 /**
  * Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format.
+ *
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ *             with descriptors for schema and format instead. Descriptors allow for
+ *             implementation-agnostic definition of tables. See also
+ *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
  */
-@PublicEvolving
+@Deprecated
 public class Kafka08JsonTableSink extends KafkaJsonTableSink {
 
 	/**
@@ -48,7 +53,9 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink {
 	 *
 	 * @param topic topic in Kafka to which table is written
 	 * @param properties properties to connect to Kafka
+	 * @deprecated Use table descriptors instead of implementation-specific classes.
 	 */
+	@Deprecated
 	public Kafka08JsonTableSink(String topic, Properties properties) {
 		super(topic, properties, new FlinkFixedPartitioner<>());
 	}
@@ -60,7 +67,9 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink {
 	 * @param topic topic in Kafka to which table is written
 	 * @param properties properties to connect to Kafka
 	 * @param partitioner Kafka partitioner
+	 * @deprecated Use table descriptors instead of implementation-specific classes.
 	 */
+	@Deprecated
 	public Kafka08JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
 		super(topic, properties, partitioner);
 	}
@@ -84,7 +93,11 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink {
 
 	@Override
 	protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
-		return new FlinkKafkaProducer08<>(topic, serializationSchema, properties, partitioner);
+		return new FlinkKafkaProducer08<>(
+			topic,
+			serializationSchema,
+			properties,
+			partitioner);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java
new file mode 100644
index 0000000..c34de13
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import java.util.Properties;
+
+/**
+ * Kafka 0.8 table sink for writing data into Kafka.
+ */
+@Internal
+public class Kafka08TableSink extends KafkaTableSink {
+
+	public Kafka08TableSink(
+			TableSchema schema,
+			String topic,
+			Properties properties,
+			FlinkKafkaPartitioner<Row> partitioner,
+			SerializationSchema<Row> serializationSchema) {
+		super(
+			schema,
+			topic,
+			properties,
+			partitioner,
+			serializationSchema);
+	}
+
+	@Override
+	protected FlinkKafkaProducerBase<Row> createKafkaProducer(
+			String topic,
+			Properties properties,
+			SerializationSchema<Row> serializationSchema,
+			FlinkKafkaPartitioner<Row> partitioner) {
+		return new FlinkKafkaProducer08<>(
+			topic,
+			serializationSchema,
+			properties,
+			partitioner);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
index 1a025b8..97c293e 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
@@ -58,7 +58,8 @@ public class Kafka08TableSource extends KafkaTableSource {
 			Optional<String> proctimeAttribute,
 			List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
 			Optional<Map<String, String>> fieldMapping,
-			String topic, Properties properties,
+			String topic,
+			Properties properties,
 			DeserializationSchema<Row> deserializationSchema,
 			StartupMode startupMode,
 			Map<KafkaTopicPartition, Long> specificStartupOffsets) {

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactory.java
deleted file mode 100644
index cd33751..0000000
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactory.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.KafkaValidator;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.types.Row;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-/**
- * Factory for creating configured instances of {@link Kafka08TableSource}.
- */
-public class Kafka08TableSourceFactory extends KafkaTableSourceFactory {
-
-	@Override
-	protected String kafkaVersion() {
-		return KafkaValidator.CONNECTOR_VERSION_VALUE_08;
-	}
-
-	@Override
-	protected boolean supportsKafkaTimestamps() {
-		return false;
-	}
-
-	@Override
-	protected KafkaTableSource createKafkaTableSource(
-			TableSchema schema,
-			Optional<String> proctimeAttribute,
-			List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
-			Map<String, String> fieldMapping,
-			String topic,
-			Properties properties,
-			DeserializationSchema<Row> deserializationSchema,
-			StartupMode startupMode,
-			Map<KafkaTopicPartition, Long> specificStartupOffsets) {
-
-		return new Kafka08TableSource(
-			schema,
-			proctimeAttribute,
-			rowtimeAttributeDescriptors,
-			Optional.of(fieldMapping),
-			topic,
-			properties,
-			deserializationSchema,
-			startupMode,
-			specificStartupOffsets);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java
new file mode 100644
index 0000000..3e93b6f
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Factory for creating configured instances of {@link Kafka08TableSource}.
+ */
+public class Kafka08TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase {
+
+	@Override
+	protected String kafkaVersion() {
+		return KafkaValidator.CONNECTOR_VERSION_VALUE_08;
+	}
+
+	@Override
+	protected boolean supportsKafkaTimestamps() {
+		return false;
+	}
+
+	@Override
+	protected KafkaTableSource createKafkaTableSource(
+			TableSchema schema,
+			Optional<String> proctimeAttribute,
+			List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+			Map<String, String> fieldMapping,
+			String topic,
+			Properties properties,
+			DeserializationSchema<Row> deserializationSchema,
+			StartupMode startupMode,
+			Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+		return new Kafka08TableSource(
+			schema,
+			proctimeAttribute,
+			rowtimeAttributeDescriptors,
+			Optional.of(fieldMapping),
+			topic,
+			properties,
+			deserializationSchema,
+			startupMode,
+			specificStartupOffsets);
+	}
+
+	@Override
+	protected KafkaTableSink createKafkaTableSink(
+			TableSchema schema,
+			String topic,
+			Properties properties,
+			FlinkKafkaPartitioner<Row> partitioner,
+			SerializationSchema<Row> serializationSchema) {
+
+		return new Kafka08TableSink(
+			schema,
+			topic,
+			properties,
+			partitioner,
+			serializationSchema);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index b83bb3f..f2e1c3f 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.streaming.connectors.kafka.Kafka08TableSourceFactory
+org.apache.flink.streaming.connectors.kafka.Kafka08TableSourceSinkFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
index 53da9f6..32bd3b6 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
@@ -27,7 +27,11 @@ import java.util.Properties;
 
 /**
  * Tests for the {@link Kafka08JsonTableSink}.
+ *
+ * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
+ *             drop support for format-specific table sinks.
  */
+@Deprecated
 public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase {
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactoryTest.java
deleted file mode 100644
index d939d88..0000000
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactoryTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.KafkaValidator;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.types.Row;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-/**
- * Test for {@link Kafka08TableSource} created by {@link Kafka08TableSourceFactory}.
- */
-public class Kafka08TableSourceFactoryTest extends KafkaTableSourceFactoryTestBase {
-
-	@Override
-	protected String getKafkaVersion() {
-		return KafkaValidator.CONNECTOR_VERSION_VALUE_08;
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	protected Class<FlinkKafkaConsumerBase<Row>> getExpectedFlinkKafkaConsumer() {
-		return (Class) FlinkKafkaConsumer08.class;
-	}
-
-	@Override
-	protected KafkaTableSource getExpectedKafkaTableSource(
-			TableSchema schema,
-			Optional<String> proctimeAttribute,
-			List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
-			Map<String, String> fieldMapping,
-			String topic,
-			Properties properties,
-			DeserializationSchema<Row> deserializationSchema,
-			StartupMode startupMode,
-			Map<KafkaTopicPartition, Long> specificStartupOffsets) {
-
-		return new Kafka08TableSource(
-			schema,
-			proctimeAttribute,
-			rowtimeAttributeDescriptors,
-			Optional.of(fieldMapping),
-			topic,
-			properties,
-			deserializationSchema,
-			startupMode,
-			specificStartupOffsets
-		);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java
new file mode 100644
index 0000000..b67501e
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Test for {@link Kafka08TableSource} and {@link Kafka08TableSink} created
+ * by {@link Kafka08TableSourceSinkFactory}.
+ */
+public class Kafka08TableSourceSinkFactoryTest extends KafkaTableSourceSinkFactoryTestBase {
+
+	@Override
+	protected String getKafkaVersion() {
+		return KafkaValidator.CONNECTOR_VERSION_VALUE_08;
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	protected Class<FlinkKafkaConsumerBase<Row>> getExpectedFlinkKafkaConsumer() {
+		return (Class) FlinkKafkaConsumer08.class;
+	}
+
+	@Override
+	protected Class<?> getExpectedFlinkKafkaProducer() {
+		return FlinkKafkaProducer08.class;
+	}
+
+	@Override
+	protected KafkaTableSource getExpectedKafkaTableSource(
+			TableSchema schema,
+			Optional<String> proctimeAttribute,
+			List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+			Map<String, String> fieldMapping,
+			String topic,
+			Properties properties,
+			DeserializationSchema<Row> deserializationSchema,
+			StartupMode startupMode,
+			Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+		return new Kafka08TableSource(
+			schema,
+			proctimeAttribute,
+			rowtimeAttributeDescriptors,
+			Optional.of(fieldMapping),
+			topic,
+			properties,
+			deserializationSchema,
+			startupMode,
+			specificStartupOffsets
+		);
+	}
+
+	@Override
+	protected KafkaTableSink getExpectedKafkaTableSink(
+			TableSchema schema,
+			String topic,
+			Properties properties,
+			FlinkKafkaPartitioner<Row> partitioner,
+			SerializationSchema<Row> serializationSchema) {
+
+		return new Kafka08TableSink(
+			schema,
+			topic,
+			properties,
+			partitioner,
+			serializationSchema
+		);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
index 95ce4e6..b3cc0aa 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
@@ -18,20 +18,25 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.types.Row;
 
 import java.util.Properties;
 
 /**
  * Kafka 0.9 {@link KafkaTableSink} that serializes data in JSON format.
+ *
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ *             with descriptors for schema and format instead. Descriptors allow for
+ *             implementation-agnostic definition of tables. See also
+ *             {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
  */
-@PublicEvolving
+@Deprecated
 public class Kafka09JsonTableSink extends KafkaJsonTableSink {
 
 	/**
@@ -48,7 +53,9 @@ public class Kafka09JsonTableSink extends KafkaJsonTableSink {
 	 *
 	 * @param topic topic in Kafka to which table is written
 	 * @param properties properties to connect to Kafka
+	 * @deprecated Use table descriptors instead of implementation-specific classes.
 	 */
+	@Deprecated
 	public Kafka09JsonTableSink(String topic, Properties properties) {
 		super(topic, properties, new FlinkFixedPartitioner<>());
 	}
@@ -60,7 +67,9 @@ public class Kafka09JsonTableSink extends KafkaJsonTableSink {
 	 * @param topic topic in Kafka to which table is written
 	 * @param properties properties to connect to Kafka
 	 * @param partitioner Kafka partitioner
+	 * @deprecated Use table descriptors instead of implementation-specific classes.
 	 */
+	@Deprecated
 	public Kafka09JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
 		super(topic, properties, partitioner);
 	}
@@ -84,7 +93,11 @@ public class Kafka09JsonTableSink extends KafkaJsonTableSink {
 
 	@Override
 	protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
-		return new FlinkKafkaProducer09<>(topic, serializationSchema, properties, partitioner);
+		return new FlinkKafkaProducer09<>(
+			topic,
+			serializationSchema,
+			properties,
+			partitioner);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java
new file mode 100644
index 0000000..8c349d7
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import java.util.Properties;
+
+/**
+ * Kafka 0.9 table sink for writing data into Kafka.
+ */
+@Internal
+public class Kafka09TableSink extends KafkaTableSink {
+
+	public Kafka09TableSink(
+			TableSchema schema,
+			String topic,
+			Properties properties,
+			FlinkKafkaPartitioner<Row> partitioner,
+			SerializationSchema<Row> serializationSchema) {
+		super(
+			schema,
+			topic,
+			properties,
+			partitioner,
+			serializationSchema);
+	}
+
+	@Override
+	protected FlinkKafkaProducerBase<Row> createKafkaProducer(
+			String topic,
+			Properties properties,
+			SerializationSchema<Row> serializationSchema,
+			FlinkKafkaPartitioner<Row> partitioner) {
+		return new FlinkKafkaProducer09<>(
+			topic,
+			serializationSchema,
+			properties,
+			partitioner);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
index 18bc1c4..8f9e799 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
@@ -58,7 +58,8 @@ public class Kafka09TableSource extends KafkaTableSource {
 			Optional<String> proctimeAttribute,
 			List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
 			Optional<Map<String, String>> fieldMapping,
-			String topic, Properties properties,
+			String topic,
+			Properties properties,
 			DeserializationSchema<Row> deserializationSchema,
 			StartupMode startupMode,
 			Map<KafkaTopicPartition, Long> specificStartupOffsets) {

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactory.java
deleted file mode 100644
index 14c52fd..0000000
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactory.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.KafkaValidator;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.types.Row;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-/**
- * Factory for creating configured instances of {@link Kafka09TableSource}.
- */
-public class Kafka09TableSourceFactory extends KafkaTableSourceFactory {
-
-	@Override
-	protected String kafkaVersion() {
-		return KafkaValidator.CONNECTOR_VERSION_VALUE_09;
-	}
-
-	@Override
-	protected boolean supportsKafkaTimestamps() {
-		return false;
-	}
-
-	@Override
-	protected KafkaTableSource createKafkaTableSource(
-			TableSchema schema,
-			Optional<String> proctimeAttribute,
-			List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
-			Map<String, String> fieldMapping,
-			String topic,
-			Properties properties,
-			DeserializationSchema<Row> deserializationSchema,
-			StartupMode startupMode,
-			Map<KafkaTopicPartition, Long> specificStartupOffsets) {
-
-		return new Kafka09TableSource(
-			schema,
-			proctimeAttribute,
-			rowtimeAttributeDescriptors,
-			Optional.of(fieldMapping),
-			topic,
-			properties,
-			deserializationSchema,
-			startupMode,
-			specificStartupOffsets);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java
new file mode 100644
index 0000000..9958b4e
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Factory for creating configured instances of {@link Kafka09TableSource}.
+ */
+public class Kafka09TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase {
+
+	@Override
+	protected String kafkaVersion() {
+		return KafkaValidator.CONNECTOR_VERSION_VALUE_09;
+	}
+
+	@Override
+	protected boolean supportsKafkaTimestamps() {
+		return false;
+	}
+
+	@Override
+	protected KafkaTableSource createKafkaTableSource(
+			TableSchema schema,
+			Optional<String> proctimeAttribute,
+			List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+			Map<String, String> fieldMapping,
+			String topic,
+			Properties properties,
+			DeserializationSchema<Row> deserializationSchema,
+			StartupMode startupMode,
+			Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+		return new Kafka09TableSource(
+			schema,
+			proctimeAttribute,
+			rowtimeAttributeDescriptors,
+			Optional.of(fieldMapping),
+			topic,
+			properties,
+			deserializationSchema,
+			startupMode,
+			specificStartupOffsets);
+	}
+
+	@Override
+	protected KafkaTableSink createKafkaTableSink(
+			TableSchema schema,
+			String topic,
+			Properties properties,
+			FlinkKafkaPartitioner<Row> partitioner,
+			SerializationSchema<Row> serializationSchema) {
+
+		return new Kafka09TableSink(
+			schema,
+			topic,
+			properties,
+			partitioner,
+			serializationSchema);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index fb14ddb..2625873 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceFactory
+org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceSinkFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
index 610e048..79f251b 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -27,7 +27,11 @@ import java.util.Properties;
 
 /**
  * Tests for the {@link Kafka09JsonTableSink}.
+ *
+ * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
+ *             drop support for format-specific table sinks.
  */
+@Deprecated
 public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
 
 	@Override