You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by twalthr <gi...@git.apache.org> on 2018/07/23 07:10:32 UTC

[GitHub] flink pull request #6387: [FLINK-9846] [table] Add a Kafka table sink factor...

GitHub user twalthr opened a pull request:

    https://github.com/apache/flink/pull/6387

     [FLINK-9846] [table] Add a Kafka table sink factory

    ## What is the purpose of the change
    
    This PR adds a Kafka table sink factory with format discovery. Currently, this enable the SQL Client to write Avro and JSON data to Kafka. The functionality is limited due to FLINK-9870. Therefore, it is currently not possible to use time attributes in the output.
    
    ## Brief change log
    
    - Decouple Kafka sink from formats and deprecate old classes
    - Add a Kafka table sink factory
    
    ## Verifying this change
    
    Existing tests for the `KafkaTableSourceFactory` have been generalized to support sinks as well.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): no
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
      - The S3 file system connector: no
    
    ## Documentation
    
      - Does this pull request introduce a new feature? yes
      - If yes, how is the feature documented? not documented yet


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/twalthr/flink FLINK-9846

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6387.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6387
    
----
commit d94bb86b2b6e08d711c30c7da7fc2c521c17d4ec
Author: Timo Walther <tw...@...>
Date:   2018-07-23T06:12:00Z

    [FLINK-9846] [table] Update KafkaTableSink class structure

commit 7328bb2a45070201b14500106af503e6519f1056
Author: Timo Walther <tw...@...>
Date:   2018-07-20T10:35:57Z

    [FLINK-9846] [table] Refactor KafkaTableSourceFactory to KafkaTableSourceSinkFactory

commit bfdac8f9a5950c6df882ac44fbd7d5e8358658c2
Author: Timo Walther <tw...@...>
Date:   2018-07-23T04:47:58Z

    [FLINK-9846] [table] Add a Kafka table sink factory

----


---

[GitHub] flink pull request #6387: [FLINK-9846] [table] Add a Kafka table sink factor...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6387#discussion_r204390897
  
    --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java ---
    @@ -58,14 +65,23 @@ public Kafka010JsonTableSink(String topic, Properties properties) {
     	 * @param topic topic in Kafka to which table is written
     	 * @param properties properties to connect to Kafka
     	 * @param partitioner Kafka partitioner
    +	 * @deprecated Use table descriptors instead of implementation-specific classes.
     	 */
    +	@Deprecated
     	public Kafka010JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
     		super(topic, properties, partitioner);
     	}
     
     	@Override
     	protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
    -		return new FlinkKafkaProducer010<>(topic, serializationSchema, properties, partitioner);
    +		final FlinkKafkaProducerBase<Row> kafkaProducer = new FlinkKafkaProducer010<>(
    +			topic,
    +			serializationSchema,
    +			properties,
    +			partitioner);
    +		// always enable flush on checkpoint to achieve at-least-once if query runs with checkpointing enabled.
    --- End diff --
    
    Good point! It was explicitly set in the table source but recently changed in FLINK-5728. Will drop this.


---

[GitHub] flink pull request #6387: [FLINK-9846] [table] Add a Kafka table sink factor...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6387#discussion_r204351288
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java ---
    @@ -125,89 +131,47 @@
     
     	@Override
     	public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
    -		final DescriptorProperties params = new DescriptorProperties(true);
    -		params.putProperties(properties);
    +		final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
    --- End diff --
    
    +1 for this changes here :)


---

[GitHub] flink pull request #6387: [FLINK-9846] [table] Add a Kafka table sink factor...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6387#discussion_r204342252
  
    --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.serialization.SerializationSchema;
    +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
    +import org.apache.flink.table.api.TableSchema;
    +import org.apache.flink.types.Row;
    +
    +import java.util.Properties;
    +
    +/**
    + * Kafka 0.10 table sink for writing data into Kafka.
    + */
    +@Internal
    +public class Kafka010TableSink extends KafkaTableSink {
    +
    +	/**
    +	 * Creates a Kafka 0.10 table sink.
    +	 *
    +	 * @param schema              The schema of the table.
    +	 * @param topic               Kafka topic to write to.
    +	 * @param properties          Properties for the Kafka producer.
    +	 * @param partitioner         Partitioner to select Kafka partition for each item.
    +	 * @param serializationSchema Serialization schema for encoding records to Kafka.
    +	 */
    +	public Kafka010TableSink(
    +			TableSchema schema,
    +			String topic,
    +			Properties properties,
    +			FlinkKafkaPartitioner<Row> partitioner,
    +			SerializationSchema<Row> serializationSchema) {
    +		super(
    +			schema,
    +			topic,
    +			properties,
    +			partitioner,
    +			serializationSchema);
    +	}
    +
    +	@Override
    +	protected FlinkKafkaProducerBase<Row> createKafkaProducer(
    +			String topic,
    +			Properties properties,
    +			SerializationSchema<Row> serializationSchema,
    +			FlinkKafkaPartitioner<Row> partitioner) {
    +		final FlinkKafkaProducerBase<Row> kafkaProducer = new FlinkKafkaProducer010<>(
    +			topic,
    +			serializationSchema,
    +			properties,
    +			partitioner);
    +		// always enable flush on checkpoint to achieve at-least-once if query runs with checkpointing enabled.
    --- End diff --
    
    ditto


---

[GitHub] flink pull request #6387: [FLINK-9846] [table] Add a Kafka table sink factor...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6387#discussion_r204347924
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java ---
    @@ -40,27 +45,68 @@
     @Internal
     public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
     
    +	// TODO make all attributes final and mandatory once we drop support for format-specific table sinks
    +
    +	/** The schema of the table. */
    +	private final Optional<TableSchema> schema;
    +
    +	/** The Kafka topic to write to. */
     	protected final String topic;
    +
    +	/** Properties for the Kafka producer. */
     	protected final Properties properties;
    -	protected SerializationSchema<Row> serializationSchema;
    +
    +	/** Serialization schema for encoding records to Kafka. */
    +	protected Optional<SerializationSchema<Row>> serializationSchema;
    +
    +	/** Partitioner to select Kafka partition for each item. */
     	protected final FlinkKafkaPartitioner<Row> partitioner;
    +
    +	// legacy variables
     	protected String[] fieldNames;
     	protected TypeInformation[] fieldTypes;
     
    +	/**
    --- End diff --
    
    Please drop this java doc or fields java docs.
    1. This is not public method
    2. it duplicates the docs of the fields
    3. fields/params are on their own self explanatory (I have no problem understanding them on their own)
    
    For that matter I would drop both of those java docs, but keeping two of them is an overkill


---

[GitHub] flink pull request #6387: [FLINK-9846] [table] Add a Kafka table sink factor...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6387#discussion_r204348635
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java ---
    @@ -82,49 +129,97 @@ public KafkaTableSink(
     	 *
     	 * @param rowSchema the schema of the row to serialize.
     	 * @return Instance of serialization schema
    +	 * @deprecated Use the constructor to pass a serialization schema instead.
     	 */
    -	protected abstract SerializationSchema<Row> createSerializationSchema(RowTypeInfo rowSchema);
    +	@Deprecated
    +	protected SerializationSchema<Row> createSerializationSchema(RowTypeInfo rowSchema) {
    +		throw new UnsupportedOperationException("This method only exists for backwards compatibility.");
    +	}
     
     	/**
     	 * Create a deep copy of this sink.
     	 *
     	 * @return Deep copy of this sink
     	 */
    -	protected abstract KafkaTableSink createCopy();
    +	@Deprecated
    +	protected KafkaTableSink createCopy() {
    +		throw new UnsupportedOperationException("This method only exists for backwards compatibility.");
    +	}
     
     	@Override
     	public void emitDataStream(DataStream<Row> dataStream) {
    -		FlinkKafkaProducerBase<Row> kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner);
    -		// always enable flush on checkpoint to achieve at-least-once if query runs with checkpointing enabled.
    -		kafkaProducer.setFlushOnCheckpoint(true);
    +		SinkFunction<Row> kafkaProducer = createKafkaProducer(
    +			topic,
    +			properties,
    +			serializationSchema.orElseThrow(() -> new IllegalStateException("No serialization schema defined.")),
    +			partitioner);
     		dataStream.addSink(kafkaProducer).name(TableConnectorUtil.generateRuntimeName(this.getClass(), fieldNames));
     	}
     
     	@Override
     	public TypeInformation<Row> getOutputType() {
    -		return new RowTypeInfo(getFieldTypes());
    +		return schema
    +			.map(TableSchema::toRowType)
    +			.orElseGet(() -> new RowTypeInfo(getFieldTypes()));
     	}
     
     	public String[] getFieldNames() {
    -		return fieldNames;
    +		return schema.map(TableSchema::getColumnNames).orElse(fieldNames);
     	}
     
     	@Override
     	public TypeInformation<?>[] getFieldTypes() {
    -		return fieldTypes;
    +		return schema.map(TableSchema::getTypes).orElse(fieldTypes);
     	}
     
     	@Override
     	public KafkaTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
    +		// a fixed schema is defined so reconfiguration is not supported
    --- End diff --
    
    Move this comment to exception description.


---

[GitHub] flink pull request #6387: [FLINK-9846] [table] Add a Kafka table sink factor...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/6387


---

[GitHub] flink pull request #6387: [FLINK-9846] [table] Add a Kafka table sink factor...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6387#discussion_r204394099
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java ---
    @@ -40,27 +45,68 @@
     @Internal
     public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
     
    +	// TODO make all attributes final and mandatory once we drop support for format-specific table sinks
    +
    +	/** The schema of the table. */
    +	private final Optional<TableSchema> schema;
    +
    +	/** The Kafka topic to write to. */
     	protected final String topic;
    +
    +	/** Properties for the Kafka producer. */
     	protected final Properties properties;
    -	protected SerializationSchema<Row> serializationSchema;
    +
    +	/** Serialization schema for encoding records to Kafka. */
    +	protected Optional<SerializationSchema<Row>> serializationSchema;
    +
    +	/** Partitioner to select Kafka partition for each item. */
     	protected final FlinkKafkaPartitioner<Row> partitioner;
    +
    +	// legacy variables
     	protected String[] fieldNames;
     	protected TypeInformation[] fieldTypes;
     
    +	/**
    --- End diff --
    
    Having more comments doesn't harm. It is also done in other internal classes such as `org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase`. With more options that might be exposed in the future from the underlying producer this might become more important.


---

[GitHub] flink pull request #6387: [FLINK-9846] [table] Add a Kafka table sink factor...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6387#discussion_r204346197
  
    --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java ---
    @@ -84,7 +93,14 @@ public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitione
     
     	@Override
     	protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
    -		return new FlinkKafkaProducer08<>(topic, serializationSchema, properties, partitioner);
    +		final FlinkKafkaProducerBase<Row> kafkaProducer = new FlinkKafkaProducer08<>(
    +			topic,
    +			serializationSchema,
    +			properties,
    +			partitioner);
    +		// always enable flush on checkpoint to achieve at-least-once if query runs with checkpointing enabled.
    --- End diff --
    
    ditto


---

[GitHub] flink pull request #6387: [FLINK-9846] [table] Add a Kafka table sink factor...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6387#discussion_r204346083
  
    --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.serialization.SerializationSchema;
    +import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
    +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
    +import org.apache.flink.table.api.TableSchema;
    +import org.apache.flink.types.Row;
    +
    +import java.util.Optional;
    +import java.util.Properties;
    +
    +/**
    + * Kafka 0.11 table sink for writing data into Kafka.
    + */
    +@Internal
    +public class Kafka011TableSink extends KafkaTableSink {
    +
    +	/**
    +	 * Creates a Kafka 0.11 table sink.
    +	 *
    +	 * @param schema              The schema of the table.
    +	 * @param topic               Kafka topic to write to.
    +	 * @param properties          Properties for the Kafka producer.
    +	 * @param partitioner         Partitioner to select Kafka partition for each item.
    +	 * @param serializationSchema Serialization schema for encoding records to Kafka.
    +	 */
    +	public Kafka011TableSink(
    +			TableSchema schema,
    +			String topic,
    +			Properties properties,
    +			FlinkKafkaPartitioner<Row> partitioner,
    +			SerializationSchema<Row> serializationSchema) {
    +		super(
    +			schema,
    +			topic,
    +			properties,
    +			partitioner,
    +			serializationSchema);
    +	}
    +
    +	@Override
    +	protected SinkFunction<Row> createKafkaProducer(
    +			String topic,
    +			Properties properties,
    +			SerializationSchema<Row> serializationSchema,
    +			FlinkKafkaPartitioner<Row> partitioner) {
    +		return new FlinkKafkaProducer011<>(
    --- End diff --
    
    here on the other hand you are relaying on default value for `at-least-once`.


---

[GitHub] flink pull request #6387: [FLINK-9846] [table] Add a Kafka table sink factor...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6387#discussion_r204342102
  
    --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java ---
    @@ -58,14 +65,23 @@ public Kafka010JsonTableSink(String topic, Properties properties) {
     	 * @param topic topic in Kafka to which table is written
     	 * @param properties properties to connect to Kafka
     	 * @param partitioner Kafka partitioner
    +	 * @deprecated Use table descriptors instead of implementation-specific classes.
     	 */
    +	@Deprecated
     	public Kafka010JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
     		super(topic, properties, partitioner);
     	}
     
     	@Override
     	protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
    -		return new FlinkKafkaProducer010<>(topic, serializationSchema, properties, partitioner);
    +		final FlinkKafkaProducerBase<Row> kafkaProducer = new FlinkKafkaProducer010<>(
    +			topic,
    +			serializationSchema,
    +			properties,
    +			partitioner);
    +		// always enable flush on checkpoint to achieve at-least-once if query runs with checkpointing enabled.
    --- End diff --
    
    1. is this relevant to the pr/commit?
    2. this value is set to true by default, thus please either drop this or add `checkState`.


---

[GitHub] flink pull request #6387: [FLINK-9846] [table] Add a Kafka table sink factor...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6387#discussion_r204406421
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java ---
    @@ -40,27 +45,68 @@
     @Internal
     public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
     
    +	// TODO make all attributes final and mandatory once we drop support for format-specific table sinks
    +
    +	/** The schema of the table. */
    +	private final Optional<TableSchema> schema;
    +
    +	/** The Kafka topic to write to. */
     	protected final String topic;
    +
    +	/** Properties for the Kafka producer. */
     	protected final Properties properties;
    -	protected SerializationSchema<Row> serializationSchema;
    +
    +	/** Serialization schema for encoding records to Kafka. */
    +	protected Optional<SerializationSchema<Row>> serializationSchema;
    +
    +	/** Partitioner to select Kafka partition for each item. */
     	protected final FlinkKafkaPartitioner<Row> partitioner;
    +
    +	// legacy variables
     	protected String[] fieldNames;
     	protected TypeInformation[] fieldTypes;
     
    +	/**
    --- End diff --
    
    More comments can harm in three ways:
    1. comments can (and very often do) become obsolete, since there is no way like tests/compile errors to make sure that they stay up to date. Especially true for same comments duplicated in many places 
    2. more lines of code to support in case of refactoring/renaming/extending
    3. minor issue of more clogged code and more lines of code to read
     
    there is a full chapter about this in clean code:
    http://apdevblog.com/comments-in-code/


---

[GitHub] flink issue #6387: [FLINK-9846] [table] Add a Kafka table sink factory

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/6387
  
    Thanks for the review @pnowojski. I will merge this once Travis gives green light.


---

[GitHub] flink pull request #6387: [FLINK-9846] [table] Add a Kafka table sink factor...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6387#discussion_r204346742
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.serialization.SerializationSchema;
    +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
    +import org.apache.flink.table.api.TableSchema;
    +import org.apache.flink.types.Row;
    +
    +import java.util.Properties;
    +
    +/**
    + * Kafka 0.9 table sink for writing data into Kafka.
    + */
    +@Internal
    +public class Kafka09TableSink extends KafkaTableSink {
    +
    +	/**
    +	 * Creates a Kafka 0.11 table sink.
    +	 *
    +	 * @param schema              The schema of the table.
    +	 * @param topic               Kafka topic to write to.
    +	 * @param properties          Properties for the Kafka producer.
    +	 * @param partitioner         Partitioner to select Kafka partition for each item.
    +	 * @param serializationSchema Serialization schema for encoding records to Kafka.
    +	 */
    +	public Kafka09TableSink(
    +			TableSchema schema,
    +			String topic,
    +			Properties properties,
    +			FlinkKafkaPartitioner<Row> partitioner,
    +			SerializationSchema<Row> serializationSchema) {
    +		super(
    +			schema,
    +			topic,
    +			properties,
    +			partitioner,
    +			serializationSchema);
    +	}
    +
    +	@Override
    +	protected FlinkKafkaProducerBase<Row> createKafkaProducer(
    +			String topic,
    +			Properties properties,
    +			SerializationSchema<Row> serializationSchema,
    +			FlinkKafkaPartitioner<Row> partitioner) {
    +		final FlinkKafkaProducerBase<Row> kafkaProducer = new FlinkKafkaProducer09<>(
    +			topic,
    +			serializationSchema,
    +			properties,
    +			partitioner);
    +		// always enable flush on checkpoint to achieve at-least-once if query runs with checkpointing enabled.
    --- End diff --
    
    ditto


---

[GitHub] flink pull request #6387: [FLINK-9846] [table] Add a Kafka table sink factor...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6387#discussion_r204346382
  
    --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.serialization.SerializationSchema;
    +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
    +import org.apache.flink.table.api.TableSchema;
    +import org.apache.flink.types.Row;
    +
    +import java.util.Properties;
    +
    +/**
    + * Kafka 0.8 table sink for writing data into Kafka.
    + */
    +@Internal
    +public class Kafka08TableSink extends KafkaTableSink {
    +
    +	/**
    +	 * Creates a Kafka 0.8 table sink.
    +	 *
    +	 * @param schema              The schema of the table.
    +	 * @param topic               Kafka topic to write to.
    +	 * @param properties          Properties for the Kafka producer.
    +	 * @param partitioner         Partitioner to select Kafka partition for each item.
    +	 * @param serializationSchema Serialization schema for encoding records to Kafka.
    +	 */
    +	public Kafka08TableSink(
    +			TableSchema schema,
    +			String topic,
    +			Properties properties,
    +			FlinkKafkaPartitioner<Row> partitioner,
    +			SerializationSchema<Row> serializationSchema) {
    +		super(
    +			schema,
    +			topic,
    +			properties,
    +			partitioner,
    +			serializationSchema);
    +	}
    +
    +	@Override
    +	protected FlinkKafkaProducerBase<Row> createKafkaProducer(
    +			String topic,
    +			Properties properties,
    +			SerializationSchema<Row> serializationSchema,
    +			FlinkKafkaPartitioner<Row> partitioner) {
    +		final FlinkKafkaProducerBase<Row> kafkaProducer = new FlinkKafkaProducer08<>(
    +			topic,
    +			serializationSchema,
    +			properties,
    +			partitioner);
    +		// always enable flush on checkpoint to achieve at-least-once if query runs with checkpointing enabled.
    --- End diff --
    
    ditto?


---

[GitHub] flink pull request #6387: [FLINK-9846] [table] Add a Kafka table sink factor...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6387#discussion_r204351776
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java ---
    @@ -125,89 +131,47 @@
     
     	@Override
     	public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
    -		final DescriptorProperties params = new DescriptorProperties(true);
    -		params.putProperties(properties);
    +		final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
     
    -		// validate
    -		// allow Kafka timestamps to be used, watermarks can not be received from source
    -		new SchemaValidator(true, supportsKafkaTimestamps(), false).validate(params);
    -		new KafkaValidator().validate(params);
    -
    -		// deserialization schema using format discovery
    -		final DeserializationSchemaFactory<?> formatFactory = TableFactoryService.find(
    -			DeserializationSchemaFactory.class,
    -			properties,
    -			this.getClass().getClassLoader());
    -		@SuppressWarnings("unchecked")
    -		final DeserializationSchema<Row> deserializationSchema = (DeserializationSchema<Row>) formatFactory
    -			.createDeserializationSchema(properties);
    -
    -		// schema
    -		final TableSchema schema = params.getTableSchema(SCHEMA());
    -
    -		// proctime
    -		final Optional<String> proctimeAttribute = SchemaValidator.deriveProctimeAttribute(params);
    -
    -		// rowtime
    -		final List<RowtimeAttributeDescriptor> rowtimeAttributes = SchemaValidator.deriveRowtimeAttributes(params);
    -
    -		// field mapping
    -		final Map<String, String> fieldMapping = SchemaValidator.deriveFieldMapping(params, Optional.of(schema));
    +		final TableSchema schema = descriptorProperties.getTableSchema(SCHEMA());
    +		final String topic = descriptorProperties.getString(CONNECTOR_TOPIC);
    +		final Tuple2<StartupMode, Map<KafkaTopicPartition, Long>> startupOptions =
    --- End diff --
    
    nit: using tuples in such places slightly reduces code readability, since it's loosing named variables. I would personally prefer introducing small Pojo with named fields:
    ```
    private static class StartupOptions {
      private final StartupMode startupMode;
      private final Map<...> specificOptions;
    }
    ```
    otherwise `specificOptions` name disappears.


---