You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/07/23 16:18:32 UTC
[1/2] flink git commit: [FLINK-9846] [table] Add a Kafka table sink
factory
Repository: flink
Updated Branches:
refs/heads/master 9e348d32c -> 57b3cde86
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactoryTest.java
deleted file mode 100644
index b976e14..0000000
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactoryTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.KafkaValidator;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.types.Row;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-/**
- * Test for {@link Kafka09TableSource} created by {@link Kafka09TableSourceFactory}.
- */
-public class Kafka09TableSourceFactoryTest extends KafkaTableSourceFactoryTestBase {
-
- @Override
- protected String getKafkaVersion() {
- return KafkaValidator.CONNECTOR_VERSION_VALUE_09;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- protected Class<FlinkKafkaConsumerBase<Row>> getExpectedFlinkKafkaConsumer() {
- return (Class) FlinkKafkaConsumer09.class;
- }
-
- @Override
- protected KafkaTableSource getExpectedKafkaTableSource(
- TableSchema schema,
- Optional<String> proctimeAttribute,
- List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
- Map<String, String> fieldMapping,
- String topic,
- Properties properties,
- DeserializationSchema<Row> deserializationSchema,
- StartupMode startupMode,
- Map<KafkaTopicPartition, Long> specificStartupOffsets) {
-
- return new Kafka09TableSource(
- schema,
- proctimeAttribute,
- rowtimeAttributeDescriptors,
- Optional.of(fieldMapping),
- topic,
- properties,
- deserializationSchema,
- startupMode,
- specificStartupOffsets
- );
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java
new file mode 100644
index 0000000..a6c8bd4
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Test for {@link Kafka09TableSource} and {@link Kafka09TableSink} created
+ * by {@link Kafka09TableSourceSinkFactory}.
+ */
+public class Kafka09TableSourceSinkFactoryTest extends KafkaTableSourceSinkFactoryTestBase {
+
+ @Override
+ protected String getKafkaVersion() {
+ return KafkaValidator.CONNECTOR_VERSION_VALUE_09;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected Class<FlinkKafkaConsumerBase<Row>> getExpectedFlinkKafkaConsumer() {
+ return (Class) FlinkKafkaConsumer09.class;
+ }
+
+ @Override
+ protected Class<?> getExpectedFlinkKafkaProducer() {
+ return FlinkKafkaProducer09.class;
+ }
+
+ @Override
+ protected KafkaTableSource getExpectedKafkaTableSource(
+ TableSchema schema,
+ Optional<String> proctimeAttribute,
+ List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+ Map<String, String> fieldMapping,
+ String topic,
+ Properties properties,
+ DeserializationSchema<Row> deserializationSchema,
+ StartupMode startupMode,
+ Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+ return new Kafka09TableSource(
+ schema,
+ proctimeAttribute,
+ rowtimeAttributeDescriptors,
+ Optional.of(fieldMapping),
+ topic,
+ properties,
+ deserializationSchema,
+ startupMode,
+ specificStartupOffsets
+ );
+ }
+
+ @Override
+ protected KafkaTableSink getExpectedKafkaTableSink(
+ TableSchema schema,
+ String topic,
+ Properties properties,
+ FlinkKafkaPartitioner<Row> partitioner,
+ SerializationSchema<Row> serializationSchema) {
+
+ return new Kafka09TableSink(
+ schema,
+ topic,
+ properties,
+ partitioner,
+ serializationSchema
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
index ec27398..231eddd 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
@@ -29,7 +29,10 @@ import java.util.Properties;
/**
* Base class for {@link KafkaTableSink} that serializes data in JSON format.
+ *
+ * @deprecated Use table descriptors instead of implementation-specific classes.
*/
+@Deprecated
@Internal
public abstract class KafkaJsonTableSink extends KafkaTableSink {
@@ -39,7 +42,9 @@ public abstract class KafkaJsonTableSink extends KafkaTableSink {
* @param topic topic in Kafka to which table is written
* @param properties properties to connect to Kafka
* @param partitioner Kafka partitioner
+ * @deprecated Use table descriptors instead of implementation-specific classes.
*/
+ @Deprecated
public KafkaJsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
super(topic, properties, partitioner);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
index 687df58..7853bb7 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -23,12 +23,17 @@ import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.util.TableConnectorUtil;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Optional;
import java.util.Properties;
/**
@@ -40,27 +45,59 @@ import java.util.Properties;
@Internal
public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
+ // TODO make all attributes final and mandatory once we drop support for format-specific table sinks
+
+ /** The schema of the table. */
+ private final Optional<TableSchema> schema;
+
+ /** The Kafka topic to write to. */
protected final String topic;
+
+ /** Properties for the Kafka producer. */
protected final Properties properties;
- protected SerializationSchema<Row> serializationSchema;
+
+ /** Serialization schema for encoding records to Kafka. */
+ protected Optional<SerializationSchema<Row>> serializationSchema;
+
+ /** Partitioner to select Kafka partition for each item. */
protected final FlinkKafkaPartitioner<Row> partitioner;
+
+ // legacy variables
protected String[] fieldNames;
protected TypeInformation[] fieldTypes;
+ protected KafkaTableSink(
+ TableSchema schema,
+ String topic,
+ Properties properties,
+ FlinkKafkaPartitioner<Row> partitioner,
+ SerializationSchema<Row> serializationSchema) {
+ this.schema = Optional.of(Preconditions.checkNotNull(schema, "Schema must not be null."));
+ this.topic = Preconditions.checkNotNull(topic, "Topic must not be null.");
+ this.properties = Preconditions.checkNotNull(properties, "Properties must not be null.");
+ this.partitioner = Preconditions.checkNotNull(partitioner, "Partitioner must not be null.");
+ this.serializationSchema = Optional.of(Preconditions.checkNotNull(
+ serializationSchema, "Serialization schema must not be null."));
+ }
+
/**
* Creates KafkaTableSink.
*
* @param topic Kafka topic to write to.
- * @param properties Properties for the Kafka consumer.
+ * @param properties Properties for the Kafka producer.
* @param partitioner Partitioner to select Kafka partition for each item
+ * @deprecated Use table descriptors instead of implementation-specific classes.
*/
+ @Deprecated
public KafkaTableSink(
String topic,
Properties properties,
FlinkKafkaPartitioner<Row> partitioner) {
+ this.schema = Optional.empty();
this.topic = Preconditions.checkNotNull(topic, "topic");
this.properties = Preconditions.checkNotNull(properties, "properties");
this.partitioner = Preconditions.checkNotNull(partitioner, "partitioner");
+ this.serializationSchema = Optional.empty();
}
/**
@@ -72,8 +109,9 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
* @param partitioner Partitioner to select Kafka partition.
* @return The version-specific Kafka producer
*/
- protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer(
- String topic, Properties properties,
+ protected abstract SinkFunction<Row> createKafkaProducer(
+ String topic,
+ Properties properties,
SerializationSchema<Row> serializationSchema,
FlinkKafkaPartitioner<Row> partitioner);
@@ -82,40 +120,57 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
*
* @param rowSchema the schema of the row to serialize.
* @return Instance of serialization schema
+ * @deprecated Use the constructor to pass a serialization schema instead.
*/
- protected abstract SerializationSchema<Row> createSerializationSchema(RowTypeInfo rowSchema);
+ @Deprecated
+ protected SerializationSchema<Row> createSerializationSchema(RowTypeInfo rowSchema) {
+ throw new UnsupportedOperationException("This method only exists for backwards compatibility.");
+ }
/**
* Create a deep copy of this sink.
*
* @return Deep copy of this sink
*/
- protected abstract KafkaTableSink createCopy();
+ @Deprecated
+ protected KafkaTableSink createCopy() {
+ throw new UnsupportedOperationException("This method only exists for backwards compatibility.");
+ }
@Override
public void emitDataStream(DataStream<Row> dataStream) {
- FlinkKafkaProducerBase<Row> kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner);
- // always enable flush on checkpoint to achieve at-least-once if query runs with checkpointing enabled.
- kafkaProducer.setFlushOnCheckpoint(true);
+ SinkFunction<Row> kafkaProducer = createKafkaProducer(
+ topic,
+ properties,
+ serializationSchema.orElseThrow(() -> new IllegalStateException("No serialization schema defined.")),
+ partitioner);
dataStream.addSink(kafkaProducer).name(TableConnectorUtil.generateRuntimeName(this.getClass(), fieldNames));
}
@Override
public TypeInformation<Row> getOutputType() {
- return new RowTypeInfo(getFieldTypes());
+ return schema
+ .map(TableSchema::toRowType)
+ .orElseGet(() -> new RowTypeInfo(getFieldTypes()));
}
public String[] getFieldNames() {
- return fieldNames;
+ return schema.map(TableSchema::getColumnNames).orElse(fieldNames);
}
@Override
public TypeInformation<?>[] getFieldTypes() {
- return fieldTypes;
+ return schema.map(TableSchema::getTypes).orElse(fieldTypes);
}
@Override
public KafkaTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+ if (schema.isPresent()) {
+ // a fixed schema is defined so reconfiguration is not supported
+ throw new UnsupportedOperationException("Reconfiguration of this sink is not supported.");
+ }
+
+ // legacy code
KafkaTableSink copy = createCopy();
copy.fieldNames = Preconditions.checkNotNull(fieldNames, "fieldNames");
copy.fieldTypes = Preconditions.checkNotNull(fieldTypes, "fieldTypes");
@@ -123,8 +178,39 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
"Number of provided field names and types does not match.");
RowTypeInfo rowSchema = new RowTypeInfo(fieldTypes, fieldNames);
- copy.serializationSchema = createSerializationSchema(rowSchema);
+ copy.serializationSchema = Optional.of(createSerializationSchema(rowSchema));
return copy;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ KafkaTableSink that = (KafkaTableSink) o;
+ return Objects.equals(schema, that.schema) &&
+ Objects.equals(topic, that.topic) &&
+ Objects.equals(properties, that.properties) &&
+ Objects.equals(serializationSchema, that.serializationSchema) &&
+ Objects.equals(partitioner, that.partitioner) &&
+ Arrays.equals(fieldNames, that.fieldNames) &&
+ Arrays.equals(fieldTypes, that.fieldTypes);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hash(
+ schema,
+ topic,
+ properties,
+ serializationSchema,
+ partitioner);
+ result = 31 * result + Arrays.hashCode(fieldNames);
+ result = 31 * result + Arrays.hashCode(fieldTypes);
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java
deleted file mode 100644
index d7e42f5..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.descriptors.KafkaValidator;
-import org.apache.flink.table.descriptors.SchemaValidator;
-import org.apache.flink.table.factories.DeserializationSchemaFactory;
-import org.apache.flink.table.factories.StreamTableSourceFactory;
-import org.apache.flink.table.factories.TableFactoryService;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.table.sources.StreamTableSource;
-import org.apache.flink.types.Row;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
-import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
-import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
-import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT;
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES;
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_KEY;
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_VALUE;
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS;
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_OFFSET;
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_PARTITION;
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_STARTUP_MODE;
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TOPIC;
-import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TYPE_VALUE_KAFKA;
-import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_CLASS;
-import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_FROM;
-import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_SERIALIZED;
-import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_TYPE;
-import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_CLASS;
-import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_DELAY;
-import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_SERIALIZED;
-import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_TYPE;
-import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA;
-import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_FROM;
-import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_NAME;
-import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_PROCTIME;
-import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_TYPE;
-import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE;
-import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND;
-
-/**
- * Factory for creating configured instances of {@link KafkaTableSource}.
- */
-public abstract class KafkaTableSourceFactory implements StreamTableSourceFactory<Row> {
-
- @Override
- public Map<String, String> requiredContext() {
- Map<String, String> context = new HashMap<>();
- context.put(UPDATE_MODE(), UPDATE_MODE_VALUE_APPEND()); // append mode
- context.put(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE_KAFKA); // kafka
- context.put(CONNECTOR_VERSION(), kafkaVersion()); // version
- context.put(CONNECTOR_PROPERTY_VERSION(), "1"); // backwards compatibility
- return context;
- }
-
- @Override
- public List<String> supportedProperties() {
- List<String> properties = new ArrayList<>();
-
- // kafka
- properties.add(CONNECTOR_TOPIC);
- properties.add(CONNECTOR_PROPERTIES);
- properties.add(CONNECTOR_PROPERTIES + ".#." + CONNECTOR_PROPERTIES_KEY);
- properties.add(CONNECTOR_PROPERTIES + ".#." + CONNECTOR_PROPERTIES_VALUE);
- properties.add(CONNECTOR_STARTUP_MODE);
- properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + CONNECTOR_SPECIFIC_OFFSETS_PARTITION);
- properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + CONNECTOR_SPECIFIC_OFFSETS_OFFSET);
-
- // schema
- properties.add(SCHEMA() + ".#." + SCHEMA_TYPE());
- properties.add(SCHEMA() + ".#." + SCHEMA_NAME());
- properties.add(SCHEMA() + ".#." + SCHEMA_FROM());
-
- // time attributes
- properties.add(SCHEMA() + ".#." + SCHEMA_PROCTIME());
- properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_TYPE());
- properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_FROM());
- properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_CLASS());
- properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED());
- properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_TYPE());
- properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_CLASS());
- properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_SERIALIZED());
- properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_DELAY());
-
- // format wildcard
- properties.add(FORMAT() + ".*");
-
- return properties;
- }
-
- @Override
- public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
- final DescriptorProperties params = new DescriptorProperties(true);
- params.putProperties(properties);
-
- // validate
- // allow Kafka timestamps to be used, watermarks can not be received from source
- new SchemaValidator(true, supportsKafkaTimestamps(), false).validate(params);
- new KafkaValidator().validate(params);
-
- // deserialization schema using format discovery
- final DeserializationSchemaFactory<?> formatFactory = TableFactoryService.find(
- DeserializationSchemaFactory.class,
- properties,
- this.getClass().getClassLoader());
- @SuppressWarnings("unchecked")
- final DeserializationSchema<Row> deserializationSchema = (DeserializationSchema<Row>) formatFactory
- .createDeserializationSchema(properties);
-
- // schema
- final TableSchema schema = params.getTableSchema(SCHEMA());
-
- // proctime
- final Optional<String> proctimeAttribute = SchemaValidator.deriveProctimeAttribute(params);
-
- // rowtime
- final List<RowtimeAttributeDescriptor> rowtimeAttributes = SchemaValidator.deriveRowtimeAttributes(params);
-
- // field mapping
- final Map<String, String> fieldMapping = SchemaValidator.deriveFieldMapping(params, Optional.of(schema));
-
- // properties
- final Properties kafkaProperties = new Properties();
- final List<Map<String, String>> propsList = params.getFixedIndexedProperties(
- CONNECTOR_PROPERTIES,
- Arrays.asList(CONNECTOR_PROPERTIES_KEY, CONNECTOR_PROPERTIES_VALUE));
- propsList.forEach(kv -> kafkaProperties.put(
- params.getString(kv.get(CONNECTOR_PROPERTIES_KEY)),
- params.getString(kv.get(CONNECTOR_PROPERTIES_VALUE))
- ));
-
- // topic
- final String topic = params.getString(CONNECTOR_TOPIC);
-
- // startup mode
- final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
- final StartupMode startupMode = params
- .getOptionalString(CONNECTOR_STARTUP_MODE)
- .map(modeString -> {
- switch (modeString) {
- case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_EARLIEST:
- return StartupMode.EARLIEST;
-
- case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_LATEST:
- return StartupMode.LATEST;
-
- case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_GROUP_OFFSETS:
- return StartupMode.GROUP_OFFSETS;
-
- case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS:
- final List<Map<String, String>> offsetList = params.getFixedIndexedProperties(
- CONNECTOR_SPECIFIC_OFFSETS,
- Arrays.asList(CONNECTOR_SPECIFIC_OFFSETS_PARTITION, CONNECTOR_SPECIFIC_OFFSETS_OFFSET));
- offsetList.forEach(kv -> {
- final int partition = params.getInt(kv.get(CONNECTOR_SPECIFIC_OFFSETS_PARTITION));
- final long offset = params.getLong(kv.get(CONNECTOR_SPECIFIC_OFFSETS_OFFSET));
- final KafkaTopicPartition topicPartition = new KafkaTopicPartition(topic, partition);
- specificOffsets.put(topicPartition, offset);
- });
- return StartupMode.SPECIFIC_OFFSETS;
- default:
- throw new TableException("Unsupported startup mode. Validator should have checked that.");
- }
- }).orElse(StartupMode.GROUP_OFFSETS);
-
- return createKafkaTableSource(
- schema,
- proctimeAttribute,
- rowtimeAttributes,
- fieldMapping,
- topic,
- kafkaProperties,
- deserializationSchema,
- startupMode,
- specificOffsets);
- }
-
- // --------------------------------------------------------------------------------------------
- // For version-specific factories
- // --------------------------------------------------------------------------------------------
-
- /**
- * Returns the Kafka version.
- */
- protected abstract String kafkaVersion();
-
- /**
- * True if the Kafka source supports Kafka timestamps, false otherwise.
- *
- * @return True if the Kafka source supports Kafka timestamps, false otherwise.
- */
- protected abstract boolean supportsKafkaTimestamps();
-
- /**
- * Constructs the version-specific Kafka table source.
- *
- * @param schema Schema of the produced table.
- * @param proctimeAttribute Field name of the processing time attribute.
- * @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute
- * @param fieldMapping Mapping for the fields of the table schema to
- * fields of the physical returned type.
- * @param topic Kafka topic to consume.
- * @param properties Properties for the Kafka consumer.
- * @param deserializationSchema Deserialization schema for decoding records from Kafka.
- * @param startupMode Startup mode for the contained consumer.
- * @param specificStartupOffsets Specific startup offsets; only relevant when startup
- * mode is {@link StartupMode#SPECIFIC_OFFSETS}.
- */
- protected abstract KafkaTableSource createKafkaTableSource(
- TableSchema schema,
- Optional<String> proctimeAttribute,
- List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
- Map<String, String> fieldMapping,
- String topic, Properties properties,
- DeserializationSchema<Row> deserializationSchema,
- StartupMode startupMode,
- Map<KafkaTopicPartition, Long> specificStartupOffsets);
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
new file mode 100644
index 0000000..3307994
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.factories.DeserializationSchemaFactory;
+import org.apache.flink.table.factories.SerializationSchemaFactory;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.factories.TableFactoryService;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
+import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_KEY;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_VALUE;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_OFFSET;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_PARTITION;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_STARTUP_MODE;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TOPIC;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TYPE_VALUE_KAFKA;
+import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_CLASS;
+import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_FROM;
+import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_SERIALIZED;
+import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_TYPE;
+import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_CLASS;
+import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_DELAY;
+import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_SERIALIZED;
+import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_TYPE;
+import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA;
+import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_FROM;
+import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_NAME;
+import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_PROCTIME;
+import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_TYPE;
+import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE;
+import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND;
+
+/**
+ * Factory for creating configured instances of {@link KafkaTableSource}.
+ */
+public abstract class KafkaTableSourceSinkFactoryBase implements
+ StreamTableSourceFactory<Row>,
+ StreamTableSinkFactory<Row> {
+
+ @Override
+ public Map<String, String> requiredContext() {
+ Map<String, String> context = new HashMap<>();
+ context.put(UPDATE_MODE(), UPDATE_MODE_VALUE_APPEND()); // append mode
+ context.put(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE_KAFKA); // kafka
+ context.put(CONNECTOR_VERSION(), kafkaVersion()); // version
+ context.put(CONNECTOR_PROPERTY_VERSION(), "1"); // backwards compatibility
+ return context;
+ }
+
+ @Override
+ public List<String> supportedProperties() {
+ List<String> properties = new ArrayList<>();
+
+ // kafka
+ properties.add(CONNECTOR_TOPIC);
+ properties.add(CONNECTOR_PROPERTIES);
+ properties.add(CONNECTOR_PROPERTIES + ".#." + CONNECTOR_PROPERTIES_KEY);
+ properties.add(CONNECTOR_PROPERTIES + ".#." + CONNECTOR_PROPERTIES_VALUE);
+ properties.add(CONNECTOR_STARTUP_MODE);
+ properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + CONNECTOR_SPECIFIC_OFFSETS_PARTITION);
+ properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + CONNECTOR_SPECIFIC_OFFSETS_OFFSET);
+
+ // schema
+ properties.add(SCHEMA() + ".#." + SCHEMA_TYPE());
+ properties.add(SCHEMA() + ".#." + SCHEMA_NAME());
+ properties.add(SCHEMA() + ".#." + SCHEMA_FROM());
+
+ // time attributes
+ properties.add(SCHEMA() + ".#." + SCHEMA_PROCTIME());
+ properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_TYPE());
+ properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_FROM());
+ properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_CLASS());
+ properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED());
+ properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_TYPE());
+ properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_CLASS());
+ properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_SERIALIZED());
+ properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_DELAY());
+
+ // format wildcard
+ properties.add(FORMAT() + ".*");
+
+ return properties;
+ }
+
+ @Override
+ public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
+ final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
+
+ final TableSchema schema = descriptorProperties.getTableSchema(SCHEMA());
+ final String topic = descriptorProperties.getString(CONNECTOR_TOPIC);
+ final StartupOptions startupOptions = getStartupOptions(descriptorProperties, topic);
+
+ return createKafkaTableSource(
+ schema,
+ SchemaValidator.deriveProctimeAttribute(descriptorProperties),
+ SchemaValidator.deriveRowtimeAttributes(descriptorProperties),
+ SchemaValidator.deriveFieldMapping(descriptorProperties, Optional.of(schema)),
+ topic,
+ getKafkaProperties(descriptorProperties),
+ getDeserializationSchema(properties),
+ startupOptions.startupMode,
+ startupOptions.specificOffsets);
+ }
+
+ @Override
+ public StreamTableSink<Row> createStreamTableSink(Map<String, String> properties) {
+ final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
+
+ final TableSchema schema = descriptorProperties.getTableSchema(SCHEMA());
+ final String topic = descriptorProperties.getString(CONNECTOR_TOPIC);
+ final Optional<String> proctime = SchemaValidator.deriveProctimeAttribute(descriptorProperties);
+ final List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors =
+ SchemaValidator.deriveRowtimeAttributes(descriptorProperties);
+
+ // see also FLINK-9870
+ if (proctime.isPresent() || !rowtimeAttributeDescriptors.isEmpty() ||
+ checkForCustomFieldMapping(descriptorProperties, schema)) {
+ throw new TableException("Time attributes and custom field mappings are not supported yet.");
+ }
+
+ return createKafkaTableSink(
+ schema,
+ topic,
+ getKafkaProperties(descriptorProperties),
+ getFlinkKafkaPartitioner(),
+ getSerializationSchema(properties));
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // For version-specific factories
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Returns the Kafka version.
+ */
+ protected abstract String kafkaVersion();
+
+ /**
+ * True if the Kafka source supports Kafka timestamps, false otherwise.
+ *
+ * @return True if the Kafka source supports Kafka timestamps, false otherwise.
+ */
+ protected abstract boolean supportsKafkaTimestamps();
+
+ /**
+ * Constructs the version-specific Kafka table source.
+ *
+ * @param schema Schema of the produced table.
+ * @param proctimeAttribute Field name of the processing time attribute.
+ * @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute
+ * @param fieldMapping Mapping for the fields of the table schema to
+ * fields of the physical returned type.
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param deserializationSchema Deserialization schema for decoding records from Kafka.
+ * @param startupMode Startup mode for the contained consumer.
+ * @param specificStartupOffsets Specific startup offsets; only relevant when startup
+ * mode is {@link StartupMode#SPECIFIC_OFFSETS}.
+ */
+ protected abstract KafkaTableSource createKafkaTableSource(
+ TableSchema schema,
+ Optional<String> proctimeAttribute,
+ List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+ Map<String, String> fieldMapping,
+ String topic,
+ Properties properties,
+ DeserializationSchema<Row> deserializationSchema,
+ StartupMode startupMode,
+ Map<KafkaTopicPartition, Long> specificStartupOffsets);
+
+ /**
+ * Constructs the version-specific Kafka table sink.
+ *
+ * @param schema Schema of the produced table.
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param partitioner Partitioner to select Kafka partition for each item.
+ */
+ protected abstract KafkaTableSink createKafkaTableSink(
+ TableSchema schema,
+ String topic,
+ Properties properties,
+ FlinkKafkaPartitioner<Row> partitioner,
+ SerializationSchema<Row> serializationSchema);
+
+ // --------------------------------------------------------------------------------------------
+ // Helper methods
+ // --------------------------------------------------------------------------------------------
+
+ private DescriptorProperties getValidatedProperties(Map<String, String> properties) {
+ final DescriptorProperties descriptorProperties = new DescriptorProperties(true);
+ descriptorProperties.putProperties(properties);
+
+ // allow Kafka timestamps to be used, watermarks can not be received from source
+ new SchemaValidator(true, supportsKafkaTimestamps(), false).validate(descriptorProperties);
+ new KafkaValidator().validate(descriptorProperties);
+
+ return descriptorProperties;
+ }
+
+ private DeserializationSchema<Row> getDeserializationSchema(Map<String, String> properties) {
+ @SuppressWarnings("unchecked")
+ final DeserializationSchemaFactory<Row> formatFactory = TableFactoryService.find(
+ DeserializationSchemaFactory.class,
+ properties,
+ this.getClass().getClassLoader());
+ return formatFactory.createDeserializationSchema(properties);
+ }
+
+ private SerializationSchema<Row> getSerializationSchema(Map<String, String> properties) {
+ @SuppressWarnings("unchecked")
+ final SerializationSchemaFactory<Row> formatFactory = TableFactoryService.find(
+ SerializationSchemaFactory.class,
+ properties,
+ this.getClass().getClassLoader());
+ return formatFactory.createSerializationSchema(properties);
+ }
+
+ private Properties getKafkaProperties(DescriptorProperties descriptorProperties) {
+ final Properties kafkaProperties = new Properties();
+ final List<Map<String, String>> propsList = descriptorProperties.getFixedIndexedProperties(
+ CONNECTOR_PROPERTIES,
+ Arrays.asList(CONNECTOR_PROPERTIES_KEY, CONNECTOR_PROPERTIES_VALUE));
+ propsList.forEach(kv -> kafkaProperties.put(
+ descriptorProperties.getString(kv.get(CONNECTOR_PROPERTIES_KEY)),
+ descriptorProperties.getString(kv.get(CONNECTOR_PROPERTIES_VALUE))
+ ));
+ return kafkaProperties;
+ }
+
+ private StartupOptions getStartupOptions(
+ DescriptorProperties descriptorProperties,
+ String topic) {
+ final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
+ final StartupMode startupMode = descriptorProperties
+ .getOptionalString(CONNECTOR_STARTUP_MODE)
+ .map(modeString -> {
+ switch (modeString) {
+ case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_EARLIEST:
+ return StartupMode.EARLIEST;
+
+ case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_LATEST:
+ return StartupMode.LATEST;
+
+ case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_GROUP_OFFSETS:
+ return StartupMode.GROUP_OFFSETS;
+
+ case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS:
+ final List<Map<String, String>> offsetList = descriptorProperties.getFixedIndexedProperties(
+ CONNECTOR_SPECIFIC_OFFSETS,
+ Arrays.asList(CONNECTOR_SPECIFIC_OFFSETS_PARTITION, CONNECTOR_SPECIFIC_OFFSETS_OFFSET));
+ offsetList.forEach(kv -> {
+ final int partition = descriptorProperties.getInt(kv.get(CONNECTOR_SPECIFIC_OFFSETS_PARTITION));
+ final long offset = descriptorProperties.getLong(kv.get(CONNECTOR_SPECIFIC_OFFSETS_OFFSET));
+ final KafkaTopicPartition topicPartition = new KafkaTopicPartition(topic, partition);
+ specificOffsets.put(topicPartition, offset);
+ });
+ return StartupMode.SPECIFIC_OFFSETS;
+ default:
+ throw new TableException("Unsupported startup mode. Validator should have checked that.");
+ }
+ }).orElse(StartupMode.GROUP_OFFSETS);
+ final StartupOptions options = new StartupOptions();
+ options.startupMode = startupMode;
+ options.specificOffsets = specificOffsets;
+ return options;
+ }
+
+ private FlinkKafkaPartitioner<Row> getFlinkKafkaPartitioner() {
+ // we don't support custom partitioner so far
+ return new FlinkFixedPartitioner<>();
+ }
+
+ private boolean checkForCustomFieldMapping(DescriptorProperties descriptorProperties, TableSchema schema) {
+ final Map<String, String> fieldMapping = SchemaValidator.deriveFieldMapping(descriptorProperties, Optional.of(schema));
+ return fieldMapping.size() != schema.getColumnNames().length ||
+ !fieldMapping.entrySet().stream().allMatch(mapping -> mapping.getKey().equals(mapping.getValue()));
+ }
+
+ private static class StartupOptions {
+ private StartupMode startupMode;
+ private Map<KafkaTopicPartition, Long> specificOffsets;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
index 6e83ddd..7e0d1fb 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
@@ -74,4 +74,14 @@ public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
return partitions[parallelInstanceId % partitions.length];
}
+
+ @Override
+ public boolean equals(Object o) {
+ return this == o || o instanceof FlinkFixedPartitioner;
+ }
+
+ @Override
+ public int hashCode() {
+ return FlinkFixedPartitioner.class.hashCode();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
index a87c622..946b6eb5 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -44,7 +44,11 @@ import static org.mockito.Mockito.when;
/**
* Abstract test base for all Kafka table sink tests.
+ *
+ * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
+ * drop support for format-specific table sinks.
*/
+@Deprecated
public abstract class KafkaTableSinkTestBase {
private static final String TOPIC = "testTopic";
@@ -94,7 +98,8 @@ public abstract class KafkaTableSinkTestBase {
protected abstract Class<? extends FlinkKafkaProducerBase> getProducerClass();
private KafkaTableSink createTableSink() {
- return createTableSink(TOPIC, PROPERTIES, PARTITIONER);
+ KafkaTableSink sink = createTableSink(TOPIC, PROPERTIES, PARTITIONER);
+ return sink.configure(FIELD_NAMES, FIELD_TYPES);
}
private static Properties createSinkProperties() {
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactoryTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactoryTestBase.java
deleted file mode 100644
index 96f1607..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactoryTestBase.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.Types;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.descriptors.Kafka;
-import org.apache.flink.table.descriptors.Rowtime;
-import org.apache.flink.table.descriptors.Schema;
-import org.apache.flink.table.descriptors.TestTableDescriptor;
-import org.apache.flink.table.factories.StreamTableSourceFactory;
-import org.apache.flink.table.factories.TableFactoryService;
-import org.apache.flink.table.factories.utils.TestDeserializationSchema;
-import org.apache.flink.table.factories.utils.TestTableFormat;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.table.sources.TableSource;
-import org.apache.flink.table.sources.tsextractors.ExistingField;
-import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Abstract test base for {@link KafkaTableSourceFactory}.
- */
-public abstract class KafkaTableSourceFactoryTestBase extends TestLogger {
-
- private static final String TOPIC = "myTopic";
- private static final int PARTITION_0 = 0;
- private static final long OFFSET_0 = 100L;
- private static final int PARTITION_1 = 1;
- private static final long OFFSET_1 = 123L;
- private static final String FRUIT_NAME = "fruit-name";
- private static final String NAME = "name";
- private static final String COUNT = "count";
- private static final String TIME = "time";
- private static final String EVENT_TIME = "event-time";
- private static final String PROC_TIME = "proc-time";
- private static final Properties KAFKA_PROPERTIES = new Properties();
- static {
- KAFKA_PROPERTIES.setProperty("zookeeper.connect", "dummy");
- KAFKA_PROPERTIES.setProperty("group.id", "dummy");
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void testTableSource() {
-
- // prepare parameters for Kafka table source
-
- final TableSchema schema = TableSchema.builder()
- .field(FRUIT_NAME, Types.STRING())
- .field(COUNT, Types.DECIMAL())
- .field(EVENT_TIME, Types.SQL_TIMESTAMP())
- .field(PROC_TIME, Types.SQL_TIMESTAMP())
- .build();
-
- final List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors = Collections.singletonList(
- new RowtimeAttributeDescriptor(EVENT_TIME, new ExistingField(TIME), new AscendingTimestamps()));
-
- final Map<String, String> fieldMapping = new HashMap<>();
- fieldMapping.put(FRUIT_NAME, NAME);
- fieldMapping.put(COUNT, COUNT);
-
- final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
- specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_0), OFFSET_0);
- specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_1), OFFSET_1);
-
- final TestDeserializationSchema deserializationSchema = new TestDeserializationSchema(
- TableSchema.builder()
- .field(NAME, Types.STRING())
- .field(COUNT, Types.DECIMAL())
- .field(TIME, Types.SQL_TIMESTAMP())
- .build()
- .toRowType()
- );
-
- final StartupMode startupMode = StartupMode.SPECIFIC_OFFSETS;
-
- final KafkaTableSource expected = getExpectedKafkaTableSource(
- schema,
- Optional.of(PROC_TIME),
- rowtimeAttributeDescriptors,
- fieldMapping,
- TOPIC,
- KAFKA_PROPERTIES,
- deserializationSchema,
- startupMode,
- specificOffsets);
-
- // construct table source using descriptors and table source factory
-
- final Map<Integer, Long> offsets = new HashMap<>();
- offsets.put(PARTITION_0, OFFSET_0);
- offsets.put(PARTITION_1, OFFSET_1);
-
- final TestTableDescriptor testDesc = new TestTableDescriptor(
- new Kafka()
- .version(getKafkaVersion())
- .topic(TOPIC)
- .properties(KAFKA_PROPERTIES)
- .startFromSpecificOffsets(offsets))
- .withFormat(new TestTableFormat())
- .withSchema(
- new Schema()
- .field(FRUIT_NAME, Types.STRING()).from(NAME)
- .field(COUNT, Types.DECIMAL()) // no from so it must match with the input
- .field(EVENT_TIME, Types.SQL_TIMESTAMP()).rowtime(
- new Rowtime().timestampsFromField(TIME).watermarksPeriodicAscending())
- .field(PROC_TIME, Types.SQL_TIMESTAMP()).proctime())
- .inAppendMode();
- final DescriptorProperties descriptorProperties = new DescriptorProperties(true);
- testDesc.addProperties(descriptorProperties);
- final Map<String, String> propertiesMap = descriptorProperties.asMap();
-
- final TableSource<?> actualSource = TableFactoryService.find(StreamTableSourceFactory.class, testDesc)
- .createStreamTableSource(propertiesMap);
-
- assertEquals(expected, actualSource);
-
- // test Kafka consumer
- final KafkaTableSource actualKafkaSource = (KafkaTableSource) actualSource;
- final StreamExecutionEnvironmentMock mock = new StreamExecutionEnvironmentMock();
- actualKafkaSource.getDataStream(mock);
- assertTrue(getExpectedFlinkKafkaConsumer().isAssignableFrom(mock.function.getClass()));
- }
-
- private static class StreamExecutionEnvironmentMock extends StreamExecutionEnvironment {
-
- public SourceFunction<?> function;
-
- @Override
- public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function) {
- this.function = function;
- return super.addSource(function);
- }
-
- @Override
- public JobExecutionResult execute(String jobName) {
- throw new UnsupportedOperationException();
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // For version-specific tests
- // --------------------------------------------------------------------------------------------
-
- protected abstract String getKafkaVersion();
-
- protected abstract Class<FlinkKafkaConsumerBase<Row>> getExpectedFlinkKafkaConsumer();
-
- protected abstract KafkaTableSource getExpectedKafkaTableSource(
- TableSchema schema,
- Optional<String> proctimeAttribute,
- List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
- Map<String, String> fieldMapping,
- String topic, Properties properties,
- DeserializationSchema<Row> deserializationSchema,
- StartupMode startupMode,
- Map<KafkaTopicPartition, Long> specificStartupOffsets);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
new file mode 100644
index 0000000..d8e8f7d
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.Kafka;
+import org.apache.flink.table.descriptors.Rowtime;
+import org.apache.flink.table.descriptors.Schema;
+import org.apache.flink.table.descriptors.TestTableDescriptor;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.factories.TableFactoryService;
+import org.apache.flink.table.factories.utils.TestDeserializationSchema;
+import org.apache.flink.table.factories.utils.TestSerializationSchema;
+import org.apache.flink.table.factories.utils.TestTableFormat;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.sources.tsextractors.ExistingField;
+import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Abstract test base for {@link KafkaTableSourceSinkFactoryBase}.
+ */
+public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger {
+
+ private static final String TOPIC = "myTopic";
+ private static final int PARTITION_0 = 0;
+ private static final long OFFSET_0 = 100L;
+ private static final int PARTITION_1 = 1;
+ private static final long OFFSET_1 = 123L;
+ private static final String FRUIT_NAME = "fruit-name";
+ private static final String NAME = "name";
+ private static final String COUNT = "count";
+ private static final String TIME = "time";
+ private static final String EVENT_TIME = "event-time";
+ private static final String PROC_TIME = "proc-time";
+ private static final Properties KAFKA_PROPERTIES = new Properties();
+ static {
+ KAFKA_PROPERTIES.setProperty("zookeeper.connect", "dummy");
+ KAFKA_PROPERTIES.setProperty("group.id", "dummy");
+ KAFKA_PROPERTIES.setProperty("bootstrap.servers", "dummy");
+ }
+
+ private static final Map<Integer, Long> OFFSETS = new HashMap<>();
+ static {
+ OFFSETS.put(PARTITION_0, OFFSET_0);
+ OFFSETS.put(PARTITION_1, OFFSET_1);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testTableSource() {
+
+ // prepare parameters for Kafka table source
+
+ final TableSchema schema = TableSchema.builder()
+ .field(FRUIT_NAME, Types.STRING())
+ .field(COUNT, Types.DECIMAL())
+ .field(EVENT_TIME, Types.SQL_TIMESTAMP())
+ .field(PROC_TIME, Types.SQL_TIMESTAMP())
+ .build();
+
+ final List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors = Collections.singletonList(
+ new RowtimeAttributeDescriptor(EVENT_TIME, new ExistingField(TIME), new AscendingTimestamps()));
+
+ final Map<String, String> fieldMapping = new HashMap<>();
+ fieldMapping.put(FRUIT_NAME, NAME);
+ fieldMapping.put(COUNT, COUNT);
+
+ final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
+ specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_0), OFFSET_0);
+ specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_1), OFFSET_1);
+
+ final TestDeserializationSchema deserializationSchema = new TestDeserializationSchema(
+ TableSchema.builder()
+ .field(NAME, Types.STRING())
+ .field(COUNT, Types.DECIMAL())
+ .field(TIME, Types.SQL_TIMESTAMP())
+ .build()
+ .toRowType()
+ );
+
+ final KafkaTableSource expected = getExpectedKafkaTableSource(
+ schema,
+ Optional.of(PROC_TIME),
+ rowtimeAttributeDescriptors,
+ fieldMapping,
+ TOPIC,
+ KAFKA_PROPERTIES,
+ deserializationSchema,
+ StartupMode.SPECIFIC_OFFSETS,
+ specificOffsets);
+
+ // construct table source using descriptors and table source factory
+
+ final TestTableDescriptor testDesc = new TestTableDescriptor(
+ new Kafka()
+ .version(getKafkaVersion())
+ .topic(TOPIC)
+ .properties(KAFKA_PROPERTIES)
+ .startFromSpecificOffsets(OFFSETS))
+ .withFormat(new TestTableFormat())
+ .withSchema(
+ new Schema()
+ .field(FRUIT_NAME, Types.STRING()).from(NAME)
+ .field(COUNT, Types.DECIMAL()) // no from so it must match with the input
+ .field(EVENT_TIME, Types.SQL_TIMESTAMP()).rowtime(
+ new Rowtime().timestampsFromField(TIME).watermarksPeriodicAscending())
+ .field(PROC_TIME, Types.SQL_TIMESTAMP()).proctime())
+ .inAppendMode();
+
+ final Map<String, String> propertiesMap = DescriptorProperties.toJavaMap(testDesc);
+ final TableSource<?> actualSource = TableFactoryService.find(StreamTableSourceFactory.class, propertiesMap)
+ .createStreamTableSource(propertiesMap);
+
+ assertEquals(expected, actualSource);
+
+ // test Kafka consumer
+ final KafkaTableSource actualKafkaSource = (KafkaTableSource) actualSource;
+ final StreamExecutionEnvironmentMock mock = new StreamExecutionEnvironmentMock();
+ actualKafkaSource.getDataStream(mock);
+ assertTrue(getExpectedFlinkKafkaConsumer().isAssignableFrom(mock.sourceFunction.getClass()));
+ }
+
+ /**
+ * This test can be unified with the corresponding source test once we have fixed FLINK-9870.
+ */
+ @Test
+ public void testTableSink() {
+ // prepare parameters for Kafka table sink
+
+ final TableSchema schema = TableSchema.builder()
+ .field(FRUIT_NAME, Types.STRING())
+ .field(COUNT, Types.DECIMAL())
+ .field(EVENT_TIME, Types.SQL_TIMESTAMP())
+ .build();
+
+ final KafkaTableSink expected = getExpectedKafkaTableSink(
+ schema,
+ TOPIC,
+ KAFKA_PROPERTIES,
+ new FlinkFixedPartitioner<>(), // a custom partitioner is not support yet
+ new TestSerializationSchema(schema.toRowType()));
+
+ // construct table sink using descriptors and table sink factory
+
+ final TestTableDescriptor testDesc = new TestTableDescriptor(
+ new Kafka()
+ .version(getKafkaVersion())
+ .topic(TOPIC)
+ .properties(KAFKA_PROPERTIES)
+ .startFromSpecificOffsets(OFFSETS)) // test if they accepted although not needed
+ .withFormat(new TestTableFormat())
+ .withSchema(
+ new Schema()
+ .field(FRUIT_NAME, Types.STRING())
+ .field(COUNT, Types.DECIMAL())
+ .field(EVENT_TIME, Types.SQL_TIMESTAMP()))
+ .inAppendMode();
+
+ final Map<String, String> propertiesMap = DescriptorProperties.toJavaMap(testDesc);
+ final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
+ .createStreamTableSink(propertiesMap);
+
+ assertEquals(expected, actualSink);
+
+ // test Kafka producer
+ final KafkaTableSink actualKafkaSink = (KafkaTableSink) actualSink;
+ final DataStreamMock streamMock = new DataStreamMock(new StreamExecutionEnvironmentMock(), schema.toRowType());
+ actualKafkaSink.emitDataStream(streamMock);
+ assertTrue(getExpectedFlinkKafkaProducer().isAssignableFrom(streamMock.sinkFunction.getClass()));
+ }
+
+ private static class StreamExecutionEnvironmentMock extends StreamExecutionEnvironment {
+
+ public SourceFunction<?> sourceFunction;
+
+ @Override
+ public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> sourceFunction) {
+ this.sourceFunction = sourceFunction;
+ return super.addSource(sourceFunction);
+ }
+
+ @Override
+ public JobExecutionResult execute(String jobName) {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private static class DataStreamMock extends DataStream<Row> {
+
+ public SinkFunction<?> sinkFunction;
+
+ public DataStreamMock(StreamExecutionEnvironment environment, TypeInformation<Row> outType) {
+ super(environment, new StreamTransformationMock("name", outType, 1));
+ }
+
+ @Override
+ public DataStreamSink<Row> addSink(SinkFunction<Row> sinkFunction) {
+ this.sinkFunction = sinkFunction;
+ return super.addSink(sinkFunction);
+ }
+ }
+
+ private static class StreamTransformationMock extends StreamTransformation<Row> {
+
+ public StreamTransformationMock(String name, TypeInformation<Row> outputType, int parallelism) {
+ super(name, outputType, parallelism);
+ }
+
+ @Override
+ public void setChainingStrategy(ChainingStrategy strategy) {
+ // do nothing
+ }
+
+ @Override
+ public Collection<StreamTransformation<?>> getTransitivePredecessors() {
+ return null;
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // For version-specific tests
+ // --------------------------------------------------------------------------------------------
+
+ protected abstract String getKafkaVersion();
+
+ protected abstract Class<FlinkKafkaConsumerBase<Row>> getExpectedFlinkKafkaConsumer();
+
+ protected abstract Class<?> getExpectedFlinkKafkaProducer();
+
+ protected abstract KafkaTableSource getExpectedKafkaTableSource(
+ TableSchema schema,
+ Optional<String> proctimeAttribute,
+ List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+ Map<String, String> fieldMapping,
+ String topic,
+ Properties properties,
+ DeserializationSchema<Row> deserializationSchema,
+ StartupMode startupMode,
+ Map<KafkaTopicPartition, Long> specificStartupOffsets);
+
+ protected abstract KafkaTableSink getExpectedKafkaTableSink(
+ TableSchema schema,
+ String topic,
+ Properties properties,
+ FlinkKafkaPartitioner<Row> partitioner,
+ SerializationSchema<Row> serializationSchema);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestSerializationSchema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestSerializationSchema.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestSerializationSchema.scala
index ab613a9..a7eaa48 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestSerializationSchema.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestSerializationSchema.scala
@@ -19,12 +19,26 @@
package org.apache.flink.table.factories.utils
import org.apache.flink.api.common.serialization.SerializationSchema
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.types.Row
/**
* Serialization schema for testing purposes.
*/
-class TestSerializationSchema extends SerializationSchema[Row] {
+class TestSerializationSchema(val typeInfo: TypeInformation[Row]) extends SerializationSchema[Row] {
override def serialize(element: Row): Array[Byte] = throw new UnsupportedOperationException()
+
+ def canEqual(other: Any): Boolean = other.isInstanceOf[TestSerializationSchema]
+
+ override def equals(other: Any): Boolean = other match {
+ case that: TestSerializationSchema =>
+ (that canEqual this) &&
+ typeInfo == that.typeInfo
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ 31 * typeInfo.hashCode()
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableFormatFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableFormatFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableFormatFactory.scala
index 475cff9..39c268e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableFormatFactory.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableFormatFactory.scala
@@ -20,9 +20,9 @@ package org.apache.flink.table.factories.utils
import java.util
-import org.apache.flink.api.common.serialization.DeserializationSchema
+import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema}
import org.apache.flink.table.descriptors.{DescriptorProperties, FormatDescriptorValidator, SchemaValidator}
-import org.apache.flink.table.factories.{DeserializationSchemaFactory, TableFormatFactoryServiceTest}
+import org.apache.flink.table.factories.{DeserializationSchemaFactory, SerializationSchemaFactory, TableFormatFactoryServiceTest}
import org.apache.flink.types.Row
/**
@@ -31,7 +31,9 @@ import org.apache.flink.types.Row
* It has the same context as [[TestAmbiguousTableFormatFactory]] and both support COMMON_PATH.
* This format does not support SPECIAL_PATH but supports schema derivation.
*/
-class TestTableFormatFactory extends DeserializationSchemaFactory[Row] {
+class TestTableFormatFactory
+ extends DeserializationSchemaFactory[Row]
+ with SerializationSchemaFactory[Row] {
override def requiredContext(): util.Map[String, String] = {
val context = new util.HashMap[String, String]()
@@ -62,4 +64,14 @@ class TestTableFormatFactory extends DeserializationSchemaFactory[Row] {
val schema = SchemaValidator.deriveFormatFields(props)
new TestDeserializationSchema(schema.toRowType)
}
+
+ override def createSerializationSchema(
+ properties: util.Map[String, String])
+ : SerializationSchema[Row] = {
+
+ val props = new DescriptorProperties(true)
+ props.putProperties(properties)
+ val schema = SchemaValidator.deriveFormatFields(props)
+ new TestSerializationSchema(schema.toRowType)
+ }
}
[2/2] flink git commit: [FLINK-9846] [table] Add a Kafka table sink
factory
Posted by tw...@apache.org.
[FLINK-9846] [table] Add a Kafka table sink factory
Adds a Kafka table sink factory with format discovery. Currently, this enables
the SQL Client to write Avro and JSON data to Kafka. The functionality is
limited due to FLINK-9870. Therefore, it is currently not possible
to use time attributes in the output.
Changes:
- Decouple Kafka sink from formats and deprecate old classes
- Add a Kafka table sink factory
- Existing tests for the KafkaTableSourceFactory have been
generalized to support sinks as well.
This closes #6387.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/57b3cde8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/57b3cde8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/57b3cde8
Branch: refs/heads/master
Commit: 57b3cde863922094be4f395063317e42349aedb3
Parents: 9e348d3
Author: Timo Walther <tw...@apache.org>
Authored: Mon Jul 23 08:12:00 2018 +0200
Committer: Timo Walther <tw...@apache.org>
Committed: Mon Jul 23 18:17:28 2018 +0200
----------------------------------------------------------------------
.../connectors/kafka/Kafka010JsonTableSink.java | 19 +-
.../connectors/kafka/Kafka010TableSink.java | 61 ++++
.../kafka/Kafka010TableSourceFactory.java | 72 ----
.../kafka/Kafka010TableSourceSinkFactory.java | 90 +++++
...rg.apache.flink.table.factories.TableFactory | 2 +-
.../kafka/Kafka010JsonTableSinkTest.java | 4 +
.../kafka/Kafka010TableSourceFactoryTest.java | 74 -----
.../Kafka010TableSourceSinkFactoryTest.java | 99 ++++++
.../connectors/kafka/Kafka011TableSink.java | 64 ++++
.../connectors/kafka/Kafka011TableSource.java | 3 +-
.../kafka/Kafka011TableSourceFactory.java | 72 ----
.../kafka/Kafka011TableSourceSinkFactory.java | 90 +++++
...rg.apache.flink.table.factories.TableFactory | 2 +-
.../kafka/Kafka011TableSourceFactoryTest.java | 74 -----
.../Kafka011TableSourceSinkFactoryTest.java | 99 ++++++
.../connectors/kafka/Kafka08JsonTableSink.java | 19 +-
.../connectors/kafka/Kafka08TableSink.java | 61 ++++
.../connectors/kafka/Kafka08TableSource.java | 3 +-
.../kafka/Kafka08TableSourceFactory.java | 72 ----
.../kafka/Kafka08TableSourceSinkFactory.java | 90 +++++
...rg.apache.flink.table.factories.TableFactory | 2 +-
.../kafka/Kafka08JsonTableSinkTest.java | 4 +
.../kafka/Kafka08TableSourceFactoryTest.java | 74 -----
.../Kafka08TableSourceSinkFactoryTest.java | 99 ++++++
.../connectors/kafka/Kafka09JsonTableSink.java | 19 +-
.../connectors/kafka/Kafka09TableSink.java | 61 ++++
.../connectors/kafka/Kafka09TableSource.java | 3 +-
.../kafka/Kafka09TableSourceFactory.java | 72 ----
.../kafka/Kafka09TableSourceSinkFactory.java | 90 +++++
...rg.apache.flink.table.factories.TableFactory | 2 +-
.../kafka/Kafka09JsonTableSinkTest.java | 4 +
.../kafka/Kafka09TableSourceFactoryTest.java | 74 -----
.../Kafka09TableSourceSinkFactoryTest.java | 99 ++++++
.../connectors/kafka/KafkaJsonTableSink.java | 5 +
.../connectors/kafka/KafkaTableSink.java | 112 ++++++-
.../kafka/KafkaTableSourceFactory.java | 251 --------------
.../kafka/KafkaTableSourceSinkFactoryBase.java | 330 +++++++++++++++++++
.../partitioner/FlinkFixedPartitioner.java | 10 +
.../kafka/KafkaTableSinkTestBase.java | 7 +-
.../kafka/KafkaTableSourceFactoryTestBase.java | 196 -----------
.../KafkaTableSourceSinkFactoryTestBase.java | 299 +++++++++++++++++
.../utils/TestSerializationSchema.scala | 16 +-
.../utils/TestTableFormatFactory.scala | 18 +-
43 files changed, 1852 insertions(+), 1065 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
index ef33cd5..2ad3142 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
@@ -18,18 +18,23 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.types.Row;
import java.util.Properties;
/**
* Kafka 0.10 {@link KafkaTableSink} that serializes data in JSON format.
+ *
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ * with descriptors for schema and format instead. Descriptors allow for
+ * implementation-agnostic definition of tables. See also
+ * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
-@PublicEvolving
+@Deprecated
public class Kafka010JsonTableSink extends KafkaJsonTableSink {
/**
@@ -46,7 +51,9 @@ public class Kafka010JsonTableSink extends KafkaJsonTableSink {
*
* @param topic topic in Kafka to which table is written
* @param properties properties to connect to Kafka
+ * @deprecated Use table descriptors instead of implementation-specific classes.
*/
+ @Deprecated
public Kafka010JsonTableSink(String topic, Properties properties) {
super(topic, properties, new FlinkFixedPartitioner<>());
}
@@ -58,14 +65,20 @@ public class Kafka010JsonTableSink extends KafkaJsonTableSink {
* @param topic topic in Kafka to which table is written
* @param properties properties to connect to Kafka
* @param partitioner Kafka partitioner
+ * @deprecated Use table descriptors instead of implementation-specific classes.
*/
+ @Deprecated
public Kafka010JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
super(topic, properties, partitioner);
}
@Override
protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
- return new FlinkKafkaProducer010<>(topic, serializationSchema, properties, partitioner);
+ return new FlinkKafkaProducer010<>(
+ topic,
+ serializationSchema,
+ properties,
+ partitioner);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java
new file mode 100644
index 0000000..a8c6553
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import java.util.Properties;
+
+/**
+ * Kafka 0.10 table sink for writing data into Kafka.
+ */
+@Internal
+public class Kafka010TableSink extends KafkaTableSink {
+
+ public Kafka010TableSink(
+ TableSchema schema,
+ String topic,
+ Properties properties,
+ FlinkKafkaPartitioner<Row> partitioner,
+ SerializationSchema<Row> serializationSchema) {
+ super(
+ schema,
+ topic,
+ properties,
+ partitioner,
+ serializationSchema);
+ }
+
+ @Override
+ protected FlinkKafkaProducerBase<Row> createKafkaProducer(
+ String topic,
+ Properties properties,
+ SerializationSchema<Row> serializationSchema,
+ FlinkKafkaPartitioner<Row> partitioner) {
+ return new FlinkKafkaProducer010<>(
+ topic,
+ serializationSchema,
+ properties,
+ partitioner);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactory.java
deleted file mode 100644
index 4a86016..0000000
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactory.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.KafkaValidator;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.types.Row;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-/**
- * Factory for creating configured instances of {@link Kafka010TableSource}.
- */
-public class Kafka010TableSourceFactory extends KafkaTableSourceFactory {
-
- @Override
- protected String kafkaVersion() {
- return KafkaValidator.CONNECTOR_VERSION_VALUE_010;
- }
-
- @Override
- protected boolean supportsKafkaTimestamps() {
- return true;
- }
-
- @Override
- protected KafkaTableSource createKafkaTableSource(
- TableSchema schema,
- Optional<String> proctimeAttribute,
- List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
- Map<String, String> fieldMapping,
- String topic,
- Properties properties,
- DeserializationSchema<Row> deserializationSchema,
- StartupMode startupMode,
- Map<KafkaTopicPartition, Long> specificStartupOffsets) {
-
- return new Kafka010TableSource(
- schema,
- proctimeAttribute,
- rowtimeAttributeDescriptors,
- Optional.of(fieldMapping),
- topic,
- properties,
- deserializationSchema,
- startupMode,
- specificStartupOffsets);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java
new file mode 100644
index 0000000..0cf9499
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Factory for creating configured instances of {@link Kafka010TableSource}.
+ */
+public class Kafka010TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase {
+
+ @Override
+ protected String kafkaVersion() {
+ return KafkaValidator.CONNECTOR_VERSION_VALUE_010;
+ }
+
+ @Override
+ protected boolean supportsKafkaTimestamps() {
+ return true;
+ }
+
+ @Override
+ protected KafkaTableSource createKafkaTableSource(
+ TableSchema schema,
+ Optional<String> proctimeAttribute,
+ List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+ Map<String, String> fieldMapping,
+ String topic,
+ Properties properties,
+ DeserializationSchema<Row> deserializationSchema,
+ StartupMode startupMode,
+ Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+ return new Kafka010TableSource(
+ schema,
+ proctimeAttribute,
+ rowtimeAttributeDescriptors,
+ Optional.of(fieldMapping),
+ topic,
+ properties,
+ deserializationSchema,
+ startupMode,
+ specificStartupOffsets);
+ }
+
+ @Override
+ protected KafkaTableSink createKafkaTableSink(
+ TableSchema schema,
+ String topic,
+ Properties properties,
+ FlinkKafkaPartitioner<Row> partitioner,
+ SerializationSchema<Row> serializationSchema) {
+
+ return new Kafka010TableSink(
+ schema,
+ topic,
+ properties,
+ partitioner,
+ serializationSchema);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index 21f5707..9bb0363 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceFactory
+org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactory
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
index af562c6..339420c 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
@@ -27,7 +27,11 @@ import java.util.Properties;
/**
* Tests for the {@link Kafka010JsonTableSink}.
+ *
+ * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
+ * drop support for format-specific table sinks.
*/
+@Deprecated
public class Kafka010JsonTableSinkTest extends KafkaTableSinkTestBase {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java
deleted file mode 100644
index ff3b0b0..0000000
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.KafkaValidator;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.types.Row;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-/**
- * Test for {@link Kafka010TableSource} created by {@link Kafka010TableSourceFactory}.
- */
-public class Kafka010TableSourceFactoryTest extends KafkaTableSourceFactoryTestBase {
-
- @Override
- protected String getKafkaVersion() {
- return KafkaValidator.CONNECTOR_VERSION_VALUE_010;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- protected Class<FlinkKafkaConsumerBase<Row>> getExpectedFlinkKafkaConsumer() {
- return (Class) FlinkKafkaConsumer010.class;
- }
-
- @Override
- protected KafkaTableSource getExpectedKafkaTableSource(
- TableSchema schema,
- Optional<String> proctimeAttribute,
- List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
- Map<String, String> fieldMapping,
- String topic,
- Properties properties,
- DeserializationSchema<Row> deserializationSchema,
- StartupMode startupMode,
- Map<KafkaTopicPartition, Long> specificStartupOffsets) {
-
- return new Kafka010TableSource(
- schema,
- proctimeAttribute,
- rowtimeAttributeDescriptors,
- Optional.of(fieldMapping),
- topic,
- properties,
- deserializationSchema,
- startupMode,
- specificStartupOffsets
- );
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java
new file mode 100644
index 0000000..cc198c9
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Test for {@link Kafka010TableSource} and {@link Kafka010TableSink} created
+ * by {@link Kafka010TableSourceSinkFactory}.
+ */
+public class Kafka010TableSourceSinkFactoryTest extends KafkaTableSourceSinkFactoryTestBase {
+
+ @Override
+ protected String getKafkaVersion() {
+ return KafkaValidator.CONNECTOR_VERSION_VALUE_010;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected Class<FlinkKafkaConsumerBase<Row>> getExpectedFlinkKafkaConsumer() {
+ return (Class) FlinkKafkaConsumer010.class;
+ }
+
+ @Override
+ protected Class<?> getExpectedFlinkKafkaProducer() {
+ return FlinkKafkaProducer010.class;
+ }
+
+ @Override
+ protected KafkaTableSource getExpectedKafkaTableSource(
+ TableSchema schema,
+ Optional<String> proctimeAttribute,
+ List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+ Map<String, String> fieldMapping,
+ String topic,
+ Properties properties,
+ DeserializationSchema<Row> deserializationSchema,
+ StartupMode startupMode,
+ Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+ return new Kafka010TableSource(
+ schema,
+ proctimeAttribute,
+ rowtimeAttributeDescriptors,
+ Optional.of(fieldMapping),
+ topic,
+ properties,
+ deserializationSchema,
+ startupMode,
+ specificStartupOffsets
+ );
+ }
+
+ @Override
+ protected KafkaTableSink getExpectedKafkaTableSink(
+ TableSchema schema,
+ String topic,
+ Properties properties,
+ FlinkKafkaPartitioner<Row> partitioner,
+ SerializationSchema<Row> serializationSchema) {
+
+ return new Kafka010TableSink(
+ schema,
+ topic,
+ properties,
+ partitioner,
+ serializationSchema
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
new file mode 100644
index 0000000..22c6da1
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Kafka 0.11 table sink for writing data into Kafka.
+ */
+@Internal
+public class Kafka011TableSink extends KafkaTableSink {
+
+ public Kafka011TableSink(
+ TableSchema schema,
+ String topic,
+ Properties properties,
+ FlinkKafkaPartitioner<Row> partitioner,
+ SerializationSchema<Row> serializationSchema) {
+ super(
+ schema,
+ topic,
+ properties,
+ partitioner,
+ serializationSchema);
+ }
+
+ @Override
+ protected SinkFunction<Row> createKafkaProducer(
+ String topic,
+ Properties properties,
+ SerializationSchema<Row> serializationSchema,
+ FlinkKafkaPartitioner<Row> partitioner) {
+ return new FlinkKafkaProducer011<>(
+ topic,
+ new KeyedSerializationSchemaWrapper<>(serializationSchema),
+ properties,
+ Optional.of(partitioner));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
index 85f5669..a646317 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
@@ -58,7 +58,8 @@ public class Kafka011TableSource extends KafkaTableSource {
Optional<String> proctimeAttribute,
List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
Optional<Map<String, String>> fieldMapping,
- String topic, Properties properties,
+ String topic,
+ Properties properties,
DeserializationSchema<Row> deserializationSchema,
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets) {
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactory.java
deleted file mode 100644
index b1e3929..0000000
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactory.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.KafkaValidator;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.types.Row;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-/**
- * Factory for creating configured instances of {@link Kafka011TableSource}.
- */
-public class Kafka011TableSourceFactory extends KafkaTableSourceFactory {
-
- @Override
- protected String kafkaVersion() {
- return KafkaValidator.CONNECTOR_VERSION_VALUE_011;
- }
-
- @Override
- protected boolean supportsKafkaTimestamps() {
- return true;
- }
-
- @Override
- protected KafkaTableSource createKafkaTableSource(
- TableSchema schema,
- Optional<String> proctimeAttribute,
- List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
- Map<String, String> fieldMapping,
- String topic,
- Properties properties,
- DeserializationSchema<Row> deserializationSchema,
- StartupMode startupMode,
- Map<KafkaTopicPartition, Long> specificStartupOffsets) {
-
- return new Kafka011TableSource(
- schema,
- proctimeAttribute,
- rowtimeAttributeDescriptors,
- Optional.of(fieldMapping),
- topic,
- properties,
- deserializationSchema,
- startupMode,
- specificStartupOffsets);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java
new file mode 100644
index 0000000..c26df42
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Factory for creating configured instances of {@link Kafka011TableSource}.
+ */
+public class Kafka011TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase {
+
+ @Override
+ protected String kafkaVersion() {
+ return KafkaValidator.CONNECTOR_VERSION_VALUE_011;
+ }
+
+ @Override
+ protected boolean supportsKafkaTimestamps() {
+ return true;
+ }
+
+ @Override
+ protected KafkaTableSource createKafkaTableSource(
+ TableSchema schema,
+ Optional<String> proctimeAttribute,
+ List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+ Map<String, String> fieldMapping,
+ String topic,
+ Properties properties,
+ DeserializationSchema<Row> deserializationSchema,
+ StartupMode startupMode,
+ Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+ return new Kafka011TableSource(
+ schema,
+ proctimeAttribute,
+ rowtimeAttributeDescriptors,
+ Optional.of(fieldMapping),
+ topic,
+ properties,
+ deserializationSchema,
+ startupMode,
+ specificStartupOffsets);
+ }
+
+ @Override
+ protected KafkaTableSink createKafkaTableSink(
+ TableSchema schema,
+ String topic,
+ Properties properties,
+ FlinkKafkaPartitioner<Row> partitioner,
+ SerializationSchema<Row> serializationSchema) {
+
+ return new Kafka011TableSink(
+ schema,
+ topic,
+ properties,
+ partitioner,
+ serializationSchema);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index c056097..b59b4a7 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceFactory
+org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceSinkFactory
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java
deleted file mode 100644
index abaa490..0000000
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.KafkaValidator;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.types.Row;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-/**
- * Test for {@link Kafka011TableSource} created by {@link Kafka011TableSourceFactory}.
- */
-public class Kafka011TableSourceFactoryTest extends KafkaTableSourceFactoryTestBase {
-
- @Override
- protected String getKafkaVersion() {
- return KafkaValidator.CONNECTOR_VERSION_VALUE_011;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- protected Class<FlinkKafkaConsumerBase<Row>> getExpectedFlinkKafkaConsumer() {
- return (Class) FlinkKafkaConsumer011.class;
- }
-
- @Override
- protected KafkaTableSource getExpectedKafkaTableSource(
- TableSchema schema,
- Optional<String> proctimeAttribute,
- List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
- Map<String, String> fieldMapping,
- String topic,
- Properties properties,
- DeserializationSchema<Row> deserializationSchema,
- StartupMode startupMode,
- Map<KafkaTopicPartition, Long> specificStartupOffsets) {
-
- return new Kafka011TableSource(
- schema,
- proctimeAttribute,
- rowtimeAttributeDescriptors,
- Optional.of(fieldMapping),
- topic,
- properties,
- deserializationSchema,
- startupMode,
- specificStartupOffsets
- );
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java
new file mode 100644
index 0000000..996c508
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Test for {@link Kafka011TableSource} and {@link Kafka011TableSink} created
+ * by {@link Kafka011TableSourceSinkFactory}.
+ */
+public class Kafka011TableSourceSinkFactoryTest extends KafkaTableSourceSinkFactoryTestBase {
+
+ @Override
+ protected String getKafkaVersion() {
+ return KafkaValidator.CONNECTOR_VERSION_VALUE_011;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected Class<FlinkKafkaConsumerBase<Row>> getExpectedFlinkKafkaConsumer() {
+ return (Class) FlinkKafkaConsumer011.class;
+ }
+
+ @Override
+ protected Class<?> getExpectedFlinkKafkaProducer() {
+ return FlinkKafkaProducer011.class;
+ }
+
+ @Override
+ protected KafkaTableSource getExpectedKafkaTableSource(
+ TableSchema schema,
+ Optional<String> proctimeAttribute,
+ List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+ Map<String, String> fieldMapping,
+ String topic,
+ Properties properties,
+ DeserializationSchema<Row> deserializationSchema,
+ StartupMode startupMode,
+ Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+ return new Kafka011TableSource(
+ schema,
+ proctimeAttribute,
+ rowtimeAttributeDescriptors,
+ Optional.of(fieldMapping),
+ topic,
+ properties,
+ deserializationSchema,
+ startupMode,
+ specificStartupOffsets
+ );
+ }
+
+ @Override
+ protected KafkaTableSink getExpectedKafkaTableSink(
+ TableSchema schema,
+ String topic,
+ Properties properties,
+ FlinkKafkaPartitioner<Row> partitioner,
+ SerializationSchema<Row> serializationSchema) {
+
+ return new Kafka011TableSink(
+ schema,
+ topic,
+ properties,
+ partitioner,
+ serializationSchema
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
index c60288d..45588cd 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
@@ -18,20 +18,25 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.types.Row;
import java.util.Properties;
/**
* Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format.
+ *
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ * with descriptors for schema and format instead. Descriptors allow for
+ * implementation-agnostic definition of tables. See also
+ * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
-@PublicEvolving
+@Deprecated
public class Kafka08JsonTableSink extends KafkaJsonTableSink {
/**
@@ -48,7 +53,9 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink {
*
* @param topic topic in Kafka to which table is written
* @param properties properties to connect to Kafka
+ * @deprecated Use table descriptors instead of implementation-specific classes.
*/
+ @Deprecated
public Kafka08JsonTableSink(String topic, Properties properties) {
super(topic, properties, new FlinkFixedPartitioner<>());
}
@@ -60,7 +67,9 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink {
* @param topic topic in Kafka to which table is written
* @param properties properties to connect to Kafka
* @param partitioner Kafka partitioner
+ * @deprecated Use table descriptors instead of implementation-specific classes.
*/
+ @Deprecated
public Kafka08JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
super(topic, properties, partitioner);
}
@@ -84,7 +93,11 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink {
@Override
protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
- return new FlinkKafkaProducer08<>(topic, serializationSchema, properties, partitioner);
+ return new FlinkKafkaProducer08<>(
+ topic,
+ serializationSchema,
+ properties,
+ partitioner);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java
new file mode 100644
index 0000000..c34de13
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import java.util.Properties;
+
+/**
+ * Kafka 0.8 table sink for writing data into Kafka.
+ */
+@Internal
+public class Kafka08TableSink extends KafkaTableSink {
+
+ public Kafka08TableSink(
+ TableSchema schema,
+ String topic,
+ Properties properties,
+ FlinkKafkaPartitioner<Row> partitioner,
+ SerializationSchema<Row> serializationSchema) {
+ super(
+ schema,
+ topic,
+ properties,
+ partitioner,
+ serializationSchema);
+ }
+
+ @Override
+ protected FlinkKafkaProducerBase<Row> createKafkaProducer(
+ String topic,
+ Properties properties,
+ SerializationSchema<Row> serializationSchema,
+ FlinkKafkaPartitioner<Row> partitioner) {
+ return new FlinkKafkaProducer08<>(
+ topic,
+ serializationSchema,
+ properties,
+ partitioner);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
index 1a025b8..97c293e 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
@@ -58,7 +58,8 @@ public class Kafka08TableSource extends KafkaTableSource {
Optional<String> proctimeAttribute,
List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
Optional<Map<String, String>> fieldMapping,
- String topic, Properties properties,
+ String topic,
+ Properties properties,
DeserializationSchema<Row> deserializationSchema,
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets) {
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactory.java
deleted file mode 100644
index cd33751..0000000
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactory.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.KafkaValidator;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.types.Row;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-/**
- * Factory for creating configured instances of {@link Kafka08TableSource}.
- */
-public class Kafka08TableSourceFactory extends KafkaTableSourceFactory {
-
- @Override
- protected String kafkaVersion() {
- return KafkaValidator.CONNECTOR_VERSION_VALUE_08;
- }
-
- @Override
- protected boolean supportsKafkaTimestamps() {
- return false;
- }
-
- @Override
- protected KafkaTableSource createKafkaTableSource(
- TableSchema schema,
- Optional<String> proctimeAttribute,
- List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
- Map<String, String> fieldMapping,
- String topic,
- Properties properties,
- DeserializationSchema<Row> deserializationSchema,
- StartupMode startupMode,
- Map<KafkaTopicPartition, Long> specificStartupOffsets) {
-
- return new Kafka08TableSource(
- schema,
- proctimeAttribute,
- rowtimeAttributeDescriptors,
- Optional.of(fieldMapping),
- topic,
- properties,
- deserializationSchema,
- startupMode,
- specificStartupOffsets);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java
new file mode 100644
index 0000000..3e93b6f
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Factory for creating configured instances of {@link Kafka08TableSource}.
+ */
+public class Kafka08TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase {
+
+ @Override
+ protected String kafkaVersion() {
+ return KafkaValidator.CONNECTOR_VERSION_VALUE_08;
+ }
+
+ @Override
+ protected boolean supportsKafkaTimestamps() {
+ return false;
+ }
+
+ @Override
+ protected KafkaTableSource createKafkaTableSource(
+ TableSchema schema,
+ Optional<String> proctimeAttribute,
+ List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+ Map<String, String> fieldMapping,
+ String topic,
+ Properties properties,
+ DeserializationSchema<Row> deserializationSchema,
+ StartupMode startupMode,
+ Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+ return new Kafka08TableSource(
+ schema,
+ proctimeAttribute,
+ rowtimeAttributeDescriptors,
+ Optional.of(fieldMapping),
+ topic,
+ properties,
+ deserializationSchema,
+ startupMode,
+ specificStartupOffsets);
+ }
+
+ @Override
+ protected KafkaTableSink createKafkaTableSink(
+ TableSchema schema,
+ String topic,
+ Properties properties,
+ FlinkKafkaPartitioner<Row> partitioner,
+ SerializationSchema<Row> serializationSchema) {
+
+ return new Kafka08TableSink(
+ schema,
+ topic,
+ properties,
+ partitioner,
+ serializationSchema);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index b83bb3f..f2e1c3f 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.flink.streaming.connectors.kafka.Kafka08TableSourceFactory
+org.apache.flink.streaming.connectors.kafka.Kafka08TableSourceSinkFactory
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
index 53da9f6..32bd3b6 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
@@ -27,7 +27,11 @@ import java.util.Properties;
/**
* Tests for the {@link Kafka08JsonTableSink}.
+ *
+ * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
+ * drop support for format-specific table sinks.
*/
+@Deprecated
public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactoryTest.java
deleted file mode 100644
index d939d88..0000000
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactoryTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.KafkaValidator;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.types.Row;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-/**
- * Test for {@link Kafka08TableSource} created by {@link Kafka08TableSourceFactory}.
- */
-public class Kafka08TableSourceFactoryTest extends KafkaTableSourceFactoryTestBase {
-
- @Override
- protected String getKafkaVersion() {
- return KafkaValidator.CONNECTOR_VERSION_VALUE_08;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- protected Class<FlinkKafkaConsumerBase<Row>> getExpectedFlinkKafkaConsumer() {
- return (Class) FlinkKafkaConsumer08.class;
- }
-
- @Override
- protected KafkaTableSource getExpectedKafkaTableSource(
- TableSchema schema,
- Optional<String> proctimeAttribute,
- List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
- Map<String, String> fieldMapping,
- String topic,
- Properties properties,
- DeserializationSchema<Row> deserializationSchema,
- StartupMode startupMode,
- Map<KafkaTopicPartition, Long> specificStartupOffsets) {
-
- return new Kafka08TableSource(
- schema,
- proctimeAttribute,
- rowtimeAttributeDescriptors,
- Optional.of(fieldMapping),
- topic,
- properties,
- deserializationSchema,
- startupMode,
- specificStartupOffsets
- );
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java
new file mode 100644
index 0000000..b67501e
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Test for {@link Kafka08TableSource} and {@link Kafka08TableSink} created
+ * by {@link Kafka08TableSourceSinkFactory}.
+ */
+public class Kafka08TableSourceSinkFactoryTest extends KafkaTableSourceSinkFactoryTestBase {
+
+ @Override
+ protected String getKafkaVersion() {
+ return KafkaValidator.CONNECTOR_VERSION_VALUE_08;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected Class<FlinkKafkaConsumerBase<Row>> getExpectedFlinkKafkaConsumer() {
+ return (Class) FlinkKafkaConsumer08.class;
+ }
+
+ @Override
+ protected Class<?> getExpectedFlinkKafkaProducer() {
+ return FlinkKafkaProducer08.class;
+ }
+
+ @Override
+ protected KafkaTableSource getExpectedKafkaTableSource(
+ TableSchema schema,
+ Optional<String> proctimeAttribute,
+ List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+ Map<String, String> fieldMapping,
+ String topic,
+ Properties properties,
+ DeserializationSchema<Row> deserializationSchema,
+ StartupMode startupMode,
+ Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+ return new Kafka08TableSource(
+ schema,
+ proctimeAttribute,
+ rowtimeAttributeDescriptors,
+ Optional.of(fieldMapping),
+ topic,
+ properties,
+ deserializationSchema,
+ startupMode,
+ specificStartupOffsets
+ );
+ }
+
+ @Override
+ protected KafkaTableSink getExpectedKafkaTableSink(
+ TableSchema schema,
+ String topic,
+ Properties properties,
+ FlinkKafkaPartitioner<Row> partitioner,
+ SerializationSchema<Row> serializationSchema) {
+
+ return new Kafka08TableSink(
+ schema,
+ topic,
+ properties,
+ partitioner,
+ serializationSchema
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
index 95ce4e6..b3cc0aa 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
@@ -18,20 +18,25 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.types.Row;
import java.util.Properties;
/**
* Kafka 0.9 {@link KafkaTableSink} that serializes data in JSON format.
+ *
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ * with descriptors for schema and format instead. Descriptors allow for
+ * implementation-agnostic definition of tables. See also
+ * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
-@PublicEvolving
+@Deprecated
public class Kafka09JsonTableSink extends KafkaJsonTableSink {
/**
@@ -48,7 +53,9 @@ public class Kafka09JsonTableSink extends KafkaJsonTableSink {
*
* @param topic topic in Kafka to which table is written
* @param properties properties to connect to Kafka
+ * @deprecated Use table descriptors instead of implementation-specific classes.
*/
+ @Deprecated
public Kafka09JsonTableSink(String topic, Properties properties) {
super(topic, properties, new FlinkFixedPartitioner<>());
}
@@ -60,7 +67,9 @@ public class Kafka09JsonTableSink extends KafkaJsonTableSink {
* @param topic topic in Kafka to which table is written
* @param properties properties to connect to Kafka
* @param partitioner Kafka partitioner
+ * @deprecated Use table descriptors instead of implementation-specific classes.
*/
+ @Deprecated
public Kafka09JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
super(topic, properties, partitioner);
}
@@ -84,7 +93,11 @@ public class Kafka09JsonTableSink extends KafkaJsonTableSink {
@Override
protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
- return new FlinkKafkaProducer09<>(topic, serializationSchema, properties, partitioner);
+ return new FlinkKafkaProducer09<>(
+ topic,
+ serializationSchema,
+ properties,
+ partitioner);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java
new file mode 100644
index 0000000..8c349d7
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import java.util.Properties;
+
+/**
+ * Kafka 0.9 table sink for writing data into Kafka.
+ */
+@Internal
+public class Kafka09TableSink extends KafkaTableSink {
+
+ public Kafka09TableSink(
+ TableSchema schema,
+ String topic,
+ Properties properties,
+ FlinkKafkaPartitioner<Row> partitioner,
+ SerializationSchema<Row> serializationSchema) {
+ super(
+ schema,
+ topic,
+ properties,
+ partitioner,
+ serializationSchema);
+ }
+
+ @Override
+ protected FlinkKafkaProducerBase<Row> createKafkaProducer(
+ String topic,
+ Properties properties,
+ SerializationSchema<Row> serializationSchema,
+ FlinkKafkaPartitioner<Row> partitioner) {
+ return new FlinkKafkaProducer09<>(
+ topic,
+ serializationSchema,
+ properties,
+ partitioner);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
index 18bc1c4..8f9e799 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
@@ -58,7 +58,8 @@ public class Kafka09TableSource extends KafkaTableSource {
Optional<String> proctimeAttribute,
List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
Optional<Map<String, String>> fieldMapping,
- String topic, Properties properties,
+ String topic,
+ Properties properties,
DeserializationSchema<Row> deserializationSchema,
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets) {
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactory.java
deleted file mode 100644
index 14c52fd..0000000
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactory.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.KafkaValidator;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.types.Row;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-/**
- * Factory for creating configured instances of {@link Kafka09TableSource}.
- */
-public class Kafka09TableSourceFactory extends KafkaTableSourceFactory {
-
- @Override
- protected String kafkaVersion() {
- return KafkaValidator.CONNECTOR_VERSION_VALUE_09;
- }
-
- @Override
- protected boolean supportsKafkaTimestamps() {
- return false;
- }
-
- @Override
- protected KafkaTableSource createKafkaTableSource(
- TableSchema schema,
- Optional<String> proctimeAttribute,
- List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
- Map<String, String> fieldMapping,
- String topic,
- Properties properties,
- DeserializationSchema<Row> deserializationSchema,
- StartupMode startupMode,
- Map<KafkaTopicPartition, Long> specificStartupOffsets) {
-
- return new Kafka09TableSource(
- schema,
- proctimeAttribute,
- rowtimeAttributeDescriptors,
- Optional.of(fieldMapping),
- topic,
- properties,
- deserializationSchema,
- startupMode,
- specificStartupOffsets);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java
new file mode 100644
index 0000000..9958b4e
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Factory for creating configured instances of {@link Kafka09TableSource}.
+ */
+public class Kafka09TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase {
+
+ @Override
+ protected String kafkaVersion() {
+ return KafkaValidator.CONNECTOR_VERSION_VALUE_09;
+ }
+
+ @Override
+ protected boolean supportsKafkaTimestamps() {
+ return false;
+ }
+
+ @Override
+ protected KafkaTableSource createKafkaTableSource(
+ TableSchema schema,
+ Optional<String> proctimeAttribute,
+ List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+ Map<String, String> fieldMapping,
+ String topic,
+ Properties properties,
+ DeserializationSchema<Row> deserializationSchema,
+ StartupMode startupMode,
+ Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+ return new Kafka09TableSource(
+ schema,
+ proctimeAttribute,
+ rowtimeAttributeDescriptors,
+ Optional.of(fieldMapping),
+ topic,
+ properties,
+ deserializationSchema,
+ startupMode,
+ specificStartupOffsets);
+ }
+
+ @Override
+ protected KafkaTableSink createKafkaTableSink(
+ TableSchema schema,
+ String topic,
+ Properties properties,
+ FlinkKafkaPartitioner<Row> partitioner,
+ SerializationSchema<Row> serializationSchema) {
+
+ return new Kafka09TableSink(
+ schema,
+ topic,
+ properties,
+ partitioner,
+ serializationSchema);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index fb14ddb..2625873 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceFactory
+org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceSinkFactory
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3cde8/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
index 610e048..79f251b 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -27,7 +27,11 @@ import java.util.Properties;
/**
* Tests for the {@link Kafka09JsonTableSink}.
+ *
+ * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
+ * drop support for format-specific table sinks.
*/
+@Deprecated
public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
@Override