You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/07/03 14:18:00 UTC
[jira] [Commented] (FLINK-6996) FlinkKafkaProducer010 doesn't
guarantee at-least-once semantic
[ https://issues.apache.org/jira/browse/FLINK-6996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16072510#comment-16072510 ]
ASF GitHub Bot commented on FLINK-6996:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/4206#discussion_r125295964
--- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java ---
@@ -172,6 +194,118 @@ public void cancel() {
}
}
+ /**
+ * Tests the at-least-once semantic for the simple writes into Kafka.
+ */
+ @Test
+ public void testOneToOneAtLeastOnceRegularSink() throws Exception {
+ testOneToOneAtLeastOnce(true);
+ }
+
+ /**
+ * Tests the at-least-once semantic for the simple writes into Kafka.
+ */
+ @Test
+ public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
+ testOneToOneAtLeastOnce(false);
+ }
+
+ /**
+ * This test sets KafkaProducer so that it will not automatically flush the data and
+ * and fails the broker to check whether FlinkKafkaProducer flushed records manually on snapshotState.
+ */
+ protected void testOneToOneAtLeastOnce(boolean regularSink) throws Exception {
+ final String topic = regularSink ? "oneToOneTopicRegularSink" : "oneToOneTopicCustomOperator";
+ final int partition = 0;
+ final int numElements = 1000;
+ final int failAfterElements = 333;
+
+ createTestTopic(topic, 1, 1);
+
+ TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+ KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper(schema);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(500);
+ env.setParallelism(1);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ env.getConfig().disableSysoutLogging();
+
+ Properties properties = new Properties();
+ properties.putAll(standardProps);
+ properties.putAll(secureProps);
+ // decrease timeout and block time from 60s down to 10s - this is how long KafkaProducer will try send pending (not flushed) data on close()
+ properties.setProperty("timeout.ms", "10000");
+ properties.setProperty("max.block.ms", "10000");
+ // increase batch.size and linger.ms - this tells KafkaProducer to batch produced events instead of flushing them immediately
+ properties.setProperty("batch.size", "10240000");
+ properties.setProperty("linger.ms", "10000");
+
+ int leaderId = kafkaServer.getLeaderToShutDown(topic);
+ BrokerRestartingMapper.resetState();
+
+ // process exactly failAfterElements number of elements and then shutdown Kafka broker and fail application
+ DataStream<Integer> inputStream = env
+ .fromCollection(getIntegersSequence(numElements))
+ .map(new BrokerRestartingMapper<Integer>(leaderId, failAfterElements));
+
+ StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
+ @Override
+ public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
+ return partition;
+ }
+ });
+
+ if (regularSink) {
+ inputStream.addSink(kafkaSink.getUserFunction());
+ }
+ else {
+ kafkaServer.produceIntoKafka(inputStream, topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
+ @Override
+ public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
+ return partition;
+ }
+ });
+ }
+
+ FailingIdentityMapper.failedBefore = false;
--- End diff --
Why do we need this here? I don't see that the `FailingIdentityMapper` is used elsewhere in the pipeline.
> FlinkKafkaProducer010 doesn't guarantee at-least-once semantic
> --------------------------------------------------------------
>
> Key: FLINK-6996
> URL: https://issues.apache.org/jira/browse/FLINK-6996
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1
> Reporter: Piotr Nowojski
> Assignee: Piotr Nowojski
>
> FlinkKafkaProducer010 doesn't implement CheckpointedFunction interface. This means, when it's used like a "regular sink function" (option a from [the java doc|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.html]) it will not flush the data on "snapshotState" as it is supposed to.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)