You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by pnowojski <gi...@git.apache.org> on 2017/06/28 07:02:36 UTC

[GitHub] flink pull request #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarant...

GitHub user pnowojski opened a pull request:

    https://github.com/apache/flink/pull/4206

    [FLINK-6996] FlinkKafkaProducer010 doesn't guarantee at-least-once semantic

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/pnowojski/flink at-least-once

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4206.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4206
    
----
commit b05b72a2baab8656787e2020120750e780b37621
Author: Piotr Nowojski <pi...@gmail.com>
Date:   2017-06-26T09:28:51Z

    [FLINK-6996] Refactor and automaticall inherit KafkaProducer integration tests

commit 62b553503964230d8af6d7d79054721060da8061
Author: Piotr Nowojski <pi...@gmail.com>
Date:   2017-06-26T10:20:36Z

    [FLINK-6996] Fix formatting in KafkaConsumerTestBase and KafkaProducerTestBase

commit 34ba4b74f0c5c6b915695ab8bf7bda5b40955d5b
Author: Piotr Nowojski <pi...@gmail.com>
Date:   2017-06-26T10:36:40Z

    [FLINK-6996] Fix at-least-once semantic for FlinkKafkaProducer010
    
    Add tests coverage for Kafka 0.10 and 0.9

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarant...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4206#discussion_r124467929
  
    --- Diff: flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---
    @@ -28,6 +28,7 @@
     import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
     import org.apache.flink.util.NetUtils;
     
    +import com.google.common.collect.ImmutableList;
    --- End diff --
    
    Avoid Guava


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarant...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4206#discussion_r124467541
  
    --- Diff: flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java ---
    @@ -18,17 +18,19 @@
     
     package org.apache.flink.streaming.connectors.kafka;
     
    -import org.junit.Test;
    -
     /**
      * IT cases for the {@link FlinkKafkaProducer08}.
      */
     @SuppressWarnings("serial")
     public class Kafka08ProducerITCase extends KafkaProducerTestBase {
     
    -	@Test
    -	public void testCustomPartitioning() {
    -		runCustomPartitioningTest();
    +	@Override
    +	public void testOneToOneAtLeastOnceRegularSink() throws Exception {
    +		// TODO: enable this for Kafka 0.8 - now it hangs indefinitely
     	}
     
    +	@Override
    +	public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
    +		// Disable this test since FlinkKafka08Producer doesn't support writing timestamps
    --- End diff --
    
    I would perhaps rephrase this comment a bit:
    it's disabled because FlinkKafka08Producer doesn't run in the custom operator mode (to be coherent with the test case name)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarant...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4206#discussion_r124476235
  
    --- Diff: flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---
    @@ -116,6 +120,30 @@ public String getVersion() {
     	}
     
     	@Override
    +	public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties properties, String topic, int partition) {
    +		ImmutableList.Builder<ConsumerRecord<K, V>> result = ImmutableList.builder();
    +		KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties);
    +		consumer.assign(ImmutableList.of(new TopicPartition(topic, partition)));
    +
    +		while (true) {
    +			boolean processedAtLeastOneRecord = false;
    --- End diff --
    
    No, it's other way around. We are braking the loop if after pooling for 1 second for next records we did get an empty response. Added comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarantee at-l...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:

    https://github.com/apache/flink/pull/4206
  
    Sorry, those tests were passing before rebase - after rebase I have accidentally reverted this previous fixup. I have re-introduced it now as a separate commit


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarantee at-l...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:

    https://github.com/apache/flink/pull/4206
  
    @tzulitai applied changes. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarant...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4206#discussion_r125306972
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java ---
    @@ -172,6 +194,118 @@ public void cancel() {
     		}
     	}
     
    +	/**
    +	 * Tests the at-least-once semantic for the simple writes into Kafka.
    +	 */
    +	@Test
    +	public void testOneToOneAtLeastOnceRegularSink() throws Exception {
    +		testOneToOneAtLeastOnce(true);
    +	}
    +
    +	/**
    +	 * Tests the at-least-once semantic for the simple writes into Kafka.
    +	 */
    +	@Test
    +	public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
    +		testOneToOneAtLeastOnce(false);
    +	}
    +
    +	/**
    +	 * This test sets KafkaProducer so that it will not automatically flush the data and
    +	 * and fails the broker to check whether FlinkKafkaProducer flushed records manually on snapshotState.
    +	 */
    +	protected void testOneToOneAtLeastOnce(boolean regularSink) throws Exception {
    +		final String topic = regularSink ? "oneToOneTopicRegularSink" : "oneToOneTopicCustomOperator";
    +		final int partition = 0;
    +		final int numElements = 1000;
    +		final int failAfterElements = 333;
    +
    +		createTestTopic(topic, 1, 1);
    +
    +		TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
    +		KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper(schema);
    +
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +		env.enableCheckpointing(500);
    +		env.setParallelism(1);
    +		env.setRestartStrategy(RestartStrategies.noRestart());
    +		env.getConfig().disableSysoutLogging();
    +
    +		Properties properties = new Properties();
    +		properties.putAll(standardProps);
    +		properties.putAll(secureProps);
    +		// decrease timeout and block time from 60s down to 10s - this is how long KafkaProducer will try send pending (not flushed) data on close()
    +		properties.setProperty("timeout.ms", "10000");
    +		properties.setProperty("max.block.ms", "10000");
    +		// increase batch.size and linger.ms - this tells KafkaProducer to batch produced events instead of flushing them immediately
    +		properties.setProperty("batch.size", "10240000");
    +		properties.setProperty("linger.ms", "10000");
    +
    +		int leaderId = kafkaServer.getLeaderToShutDown(topic);
    +		BrokerRestartingMapper.resetState();
    +
    +		// process exactly failAfterElements number of elements and then shutdown Kafka broker and fail application
    +		DataStream<Integer> inputStream = env
    +			.fromCollection(getIntegersSequence(numElements))
    +			.map(new BrokerRestartingMapper<Integer>(leaderId, failAfterElements));
    +
    +		StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
    +			@Override
    +			public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
    +				return partition;
    +			}
    +		});
    +
    +		if (regularSink) {
    +			inputStream.addSink(kafkaSink.getUserFunction());
    +		}
    +		else {
    +			kafkaServer.produceIntoKafka(inputStream, topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
    +				@Override
    +				public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
    +					return partition;
    +				}
    +			});
    +		}
    +
    +		FailingIdentityMapper.failedBefore = false;
    --- End diff --
    
    This is static variable and `FialingIdentityMapper` is used twice. First to test regular sink and then custom sink operator. Without reseting this state second test run would fail.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarant...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4206#discussion_r125411707
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java ---
    @@ -172,6 +194,118 @@ public void cancel() {
     		}
     	}
     
    +	/**
    +	 * Tests the at-least-once semantic for the simple writes into Kafka.
    +	 */
    +	@Test
    +	public void testOneToOneAtLeastOnceRegularSink() throws Exception {
    +		testOneToOneAtLeastOnce(true);
    +	}
    +
    +	/**
    +	 * Tests the at-least-once semantic for the simple writes into Kafka.
    +	 */
    +	@Test
    +	public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
    +		testOneToOneAtLeastOnce(false);
    +	}
    +
    +	/**
    +	 * This test sets KafkaProducer so that it will not automatically flush the data and
    +	 * and fails the broker to check whether FlinkKafkaProducer flushed records manually on snapshotState.
    +	 */
    +	protected void testOneToOneAtLeastOnce(boolean regularSink) throws Exception {
    +		final String topic = regularSink ? "oneToOneTopicRegularSink" : "oneToOneTopicCustomOperator";
    +		final int partition = 0;
    +		final int numElements = 1000;
    +		final int failAfterElements = 333;
    +
    +		createTestTopic(topic, 1, 1);
    +
    +		TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
    +		KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper(schema);
    +
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +		env.enableCheckpointing(500);
    +		env.setParallelism(1);
    +		env.setRestartStrategy(RestartStrategies.noRestart());
    +		env.getConfig().disableSysoutLogging();
    +
    +		Properties properties = new Properties();
    +		properties.putAll(standardProps);
    +		properties.putAll(secureProps);
    +		// decrease timeout and block time from 60s down to 10s - this is how long KafkaProducer will try send pending (not flushed) data on close()
    +		properties.setProperty("timeout.ms", "10000");
    +		properties.setProperty("max.block.ms", "10000");
    +		// increase batch.size and linger.ms - this tells KafkaProducer to batch produced events instead of flushing them immediately
    +		properties.setProperty("batch.size", "10240000");
    +		properties.setProperty("linger.ms", "10000");
    +
    +		int leaderId = kafkaServer.getLeaderToShutDown(topic);
    +		BrokerRestartingMapper.resetState();
    +
    +		// process exactly failAfterElements number of elements and then shutdown Kafka broker and fail application
    +		DataStream<Integer> inputStream = env
    +			.fromCollection(getIntegersSequence(numElements))
    +			.map(new BrokerRestartingMapper<Integer>(leaderId, failAfterElements));
    +
    +		StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
    +			@Override
    +			public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
    +				return partition;
    +			}
    +		});
    +
    +		if (regularSink) {
    +			inputStream.addSink(kafkaSink.getUserFunction());
    +		}
    +		else {
    +			kafkaServer.produceIntoKafka(inputStream, topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
    +				@Override
    +				public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
    +					return partition;
    +				}
    +			});
    +		}
    +
    +		FailingIdentityMapper.failedBefore = false;
    --- End diff --
    
    I see. Perhaps we can make this more explicit by following the same pattern as `BrokerRestartingMapper.resetState()`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarant...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4206#discussion_r124467315
  
    --- Diff: flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java ---
    @@ -18,17 +18,19 @@
     
     package org.apache.flink.streaming.connectors.kafka;
     
    -import org.junit.Test;
    -
     /**
      * IT cases for the {@link FlinkKafkaProducer08}.
      */
     @SuppressWarnings("serial")
     public class Kafka08ProducerITCase extends KafkaProducerTestBase {
     
    -	@Test
    -	public void testCustomPartitioning() {
    -		runCustomPartitioningTest();
    +	@Override
    +	public void testOneToOneAtLeastOnceRegularSink() throws Exception {
    +		// TODO: enable this for Kafka 0.8 - now it hangs indefinitely
    --- End diff --
    
    If this a pending fix?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarantee at-l...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4206
  
    LGTM, merging 👍 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarant...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4206#discussion_r124468424
  
    --- Diff: flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---
    @@ -116,6 +120,30 @@ public String getVersion() {
     	}
     
     	@Override
    +	public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties properties, String topic, int partition) {
    +		ImmutableList.Builder<ConsumerRecord<K, V>> result = ImmutableList.builder();
    +		KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties);
    +		consumer.assign(ImmutableList.of(new TopicPartition(topic, partition)));
    +
    +		while (true) {
    +			boolean processedAtLeastOneRecord = false;
    --- End diff --
    
    I'm a bit confused by this flag.
    The method name is `getAllRecordsFromTopic`, but it seems like we're escaping the loop once some record is fetched.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarantee at-l...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4206
  
    Thanks for the fixups @pnowojski!
    I have some final minor comments, other than that this LGTM.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarantee at-l...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4206
  
    @pnowojski related test `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink` is failing in Travis. It seems like the fetched records from Kafka is empty?
    
    ```
    Failed tests: 
      Kafka09ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink:202->KafkaProducerTestBase.testOneToOneAtLeastOnce:282->KafkaProducerTestBase.assertAtLeastOnceForTopic:298 expected:<[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180,
  181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232, 233, 234, 235, 236, 237, 238, 239, 240, 241, 242, 243, 244, 245, 246, 247, 248, 249, 250, 251, 252, 253, 254, 255, 256, 257, 258, 259, 260, 261, 262, 263, 264, 265, 266, 267, 268, 269]> but was:<[]>
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarant...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4206#discussion_r125416757
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java ---
    @@ -172,6 +195,144 @@ public void cancel() {
     		}
     	}
     
    +	/**
    +	 * Tests the at-least-once semantic for the simple writes into Kafka.
    +	 */
    +	@Test
    +	public void testOneToOneAtLeastOnceRegularSink() throws Exception {
    +		testOneToOneAtLeastOnce(true);
    +	}
    +
    +	/**
    +	 * Tests the at-least-once semantic for the simple writes into Kafka.
    +	 */
    +	@Test
    +	public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
    +		testOneToOneAtLeastOnce(false);
    +	}
    +
    +	/**
    +	 * This test sets KafkaProducer so that it will not automatically flush the data and
    +	 * and fails the broker to check whether FlinkKafkaProducer flushed records manually on snapshotState.
    +	 */
    +	protected void testOneToOneAtLeastOnce(boolean regularSink) throws Exception {
    +		final String topic = regularSink ? "oneToOneTopicRegularSink" : "oneToOneTopicCustomOperator";
    +		final int partition = 0;
    +		final int numElements = 1000;
    +		final int failAfterElements = 333;
    +
    +		createTestTopic(topic, 1, 1);
    +
    +		TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
    +		KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper(schema);
    +
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +		env.enableCheckpointing(500);
    +		env.setParallelism(1);
    +		env.setRestartStrategy(RestartStrategies.noRestart());
    +		env.getConfig().disableSysoutLogging();
    +
    +		Properties properties = new Properties();
    +		properties.putAll(standardProps);
    +		properties.putAll(secureProps);
    +		// decrease timeout and block time from 60s down to 10s - this is how long KafkaProducer will try send pending (not flushed) data on close()
    +		properties.setProperty("timeout.ms", "10000");
    +		properties.setProperty("max.block.ms", "10000");
    +		// increase batch.size and linger.ms - this tells KafkaProducer to batch produced events instead of flushing them immediately
    +		properties.setProperty("batch.size", "10240000");
    +		properties.setProperty("linger.ms", "10000");
    +
    +		int leaderId = kafkaServer.getLeaderToShutDown(topic);
    +		BrokerRestartingMapper.resetState();
    +
    +		// process exactly failAfterElements number of elements and then shutdown Kafka broker and fail application
    +		DataStream<Integer> inputStream = env
    +			.fromCollection(getIntegersSequence(numElements))
    +			.map(new BrokerRestartingMapper<Integer>(leaderId, failAfterElements));
    +
    +		StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
    +			@Override
    +			public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
    +				return partition;
    +			}
    +		});
    +
    +		if (regularSink) {
    +			inputStream.addSink(kafkaSink.getUserFunction());
    +		}
    +		else {
    +			kafkaServer.produceIntoKafka(inputStream, topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
    +				@Override
    +				public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
    +					return partition;
    +				}
    +			});
    +		}
    +
    +		FailingIdentityMapper.failedBefore = false;
    +		try {
    +			env.execute("One-to-one at least once test");
    +			fail("Job should fail!");
    +		}
    +		catch (Exception ex) {
    --- End diff --
    
    FYI `getCause` Exception is type of `java.lang.Exception`, so there is no point in making an assertion on that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarant...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4206#discussion_r125295964
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java ---
    @@ -172,6 +194,118 @@ public void cancel() {
     		}
     	}
     
    +	/**
    +	 * Tests the at-least-once semantic for the simple writes into Kafka.
    +	 */
    +	@Test
    +	public void testOneToOneAtLeastOnceRegularSink() throws Exception {
    +		testOneToOneAtLeastOnce(true);
    +	}
    +
    +	/**
    +	 * Tests the at-least-once semantic for the simple writes into Kafka.
    +	 */
    +	@Test
    +	public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
    +		testOneToOneAtLeastOnce(false);
    +	}
    +
    +	/**
    +	 * This test sets KafkaProducer so that it will not automatically flush the data and
    +	 * and fails the broker to check whether FlinkKafkaProducer flushed records manually on snapshotState.
    +	 */
    +	protected void testOneToOneAtLeastOnce(boolean regularSink) throws Exception {
    +		final String topic = regularSink ? "oneToOneTopicRegularSink" : "oneToOneTopicCustomOperator";
    +		final int partition = 0;
    +		final int numElements = 1000;
    +		final int failAfterElements = 333;
    +
    +		createTestTopic(topic, 1, 1);
    +
    +		TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
    +		KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper(schema);
    +
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +		env.enableCheckpointing(500);
    +		env.setParallelism(1);
    +		env.setRestartStrategy(RestartStrategies.noRestart());
    +		env.getConfig().disableSysoutLogging();
    +
    +		Properties properties = new Properties();
    +		properties.putAll(standardProps);
    +		properties.putAll(secureProps);
    +		// decrease timeout and block time from 60s down to 10s - this is how long KafkaProducer will try send pending (not flushed) data on close()
    +		properties.setProperty("timeout.ms", "10000");
    +		properties.setProperty("max.block.ms", "10000");
    +		// increase batch.size and linger.ms - this tells KafkaProducer to batch produced events instead of flushing them immediately
    +		properties.setProperty("batch.size", "10240000");
    +		properties.setProperty("linger.ms", "10000");
    +
    +		int leaderId = kafkaServer.getLeaderToShutDown(topic);
    +		BrokerRestartingMapper.resetState();
    +
    +		// process exactly failAfterElements number of elements and then shutdown Kafka broker and fail application
    +		DataStream<Integer> inputStream = env
    +			.fromCollection(getIntegersSequence(numElements))
    +			.map(new BrokerRestartingMapper<Integer>(leaderId, failAfterElements));
    +
    +		StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
    +			@Override
    +			public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
    +				return partition;
    +			}
    +		});
    +
    +		if (regularSink) {
    +			inputStream.addSink(kafkaSink.getUserFunction());
    +		}
    +		else {
    +			kafkaServer.produceIntoKafka(inputStream, topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
    +				@Override
    +				public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
    +					return partition;
    +				}
    +			});
    +		}
    +
    +		FailingIdentityMapper.failedBefore = false;
    --- End diff --
    
    Why do we need this here? I don't see that the `FailingIdentityMapper` is used elsewhere in the pipeline.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarantee at-l...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4206
  
    It seems like the tests are still failing on Travis:
    
    >Failed tests: 
      Kafka09ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink:202->KafkaProducerTestBase.testOneToOneAtLeastOnce:282->KafkaProducerTestBase.assertAtLeastOnceForTopic:298 expected:<[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180,
  181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232, 233, 234, 235, 236, 237, 238, 239, 240, 241, 242, 243, 244, 245, 246, 247, 248, 249, 250, 251, 252, 253, 254, 255, 256, 257, 258, 259, 260, 261, 262, 263, 264, 265, 266, 267, 268, 269, 270, 271, 272, 273, 274, 275, 276, 277, 278, 279, 280, 281, 282, 283, 284, 285, 286, 287, 288, 289, 290, 291, 292, 293, 294, 295, 296, 297, 298, 299, 300, 301, 302, 303, 304, 305, 306, 307, 308, 309, 310, 311, 312, 313, 314, 315, 316, 317, 318, 319, 320, 321, 322, 323, 324, 325, 326, 327, 328, 329, 330, 331]> but was:<[]>


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarant...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4206#discussion_r125411227
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java ---
    @@ -172,6 +195,144 @@ public void cancel() {
     		}
     	}
     
    +	/**
    +	 * Tests the at-least-once semantic for the simple writes into Kafka.
    +	 */
    +	@Test
    +	public void testOneToOneAtLeastOnceRegularSink() throws Exception {
    +		testOneToOneAtLeastOnce(true);
    +	}
    +
    +	/**
    +	 * Tests the at-least-once semantic for the simple writes into Kafka.
    +	 */
    +	@Test
    +	public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
    +		testOneToOneAtLeastOnce(false);
    +	}
    +
    +	/**
    +	 * This test sets KafkaProducer so that it will not automatically flush the data and
    +	 * and fails the broker to check whether FlinkKafkaProducer flushed records manually on snapshotState.
    +	 */
    +	protected void testOneToOneAtLeastOnce(boolean regularSink) throws Exception {
    +		final String topic = regularSink ? "oneToOneTopicRegularSink" : "oneToOneTopicCustomOperator";
    +		final int partition = 0;
    +		final int numElements = 1000;
    +		final int failAfterElements = 333;
    +
    +		createTestTopic(topic, 1, 1);
    +
    +		TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
    +		KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper(schema);
    +
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +		env.enableCheckpointing(500);
    +		env.setParallelism(1);
    +		env.setRestartStrategy(RestartStrategies.noRestart());
    +		env.getConfig().disableSysoutLogging();
    +
    +		Properties properties = new Properties();
    +		properties.putAll(standardProps);
    +		properties.putAll(secureProps);
    +		// decrease timeout and block time from 60s down to 10s - this is how long KafkaProducer will try send pending (not flushed) data on close()
    +		properties.setProperty("timeout.ms", "10000");
    +		properties.setProperty("max.block.ms", "10000");
    +		// increase batch.size and linger.ms - this tells KafkaProducer to batch produced events instead of flushing them immediately
    +		properties.setProperty("batch.size", "10240000");
    +		properties.setProperty("linger.ms", "10000");
    +
    +		int leaderId = kafkaServer.getLeaderToShutDown(topic);
    +		BrokerRestartingMapper.resetState();
    +
    +		// process exactly failAfterElements number of elements and then shutdown Kafka broker and fail application
    +		DataStream<Integer> inputStream = env
    +			.fromCollection(getIntegersSequence(numElements))
    +			.map(new BrokerRestartingMapper<Integer>(leaderId, failAfterElements));
    +
    +		StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
    +			@Override
    +			public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
    +				return partition;
    +			}
    +		});
    +
    +		if (regularSink) {
    +			inputStream.addSink(kafkaSink.getUserFunction());
    +		}
    +		else {
    +			kafkaServer.produceIntoKafka(inputStream, topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
    +				@Override
    +				public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
    +					return partition;
    +				}
    +			});
    +		}
    +
    +		FailingIdentityMapper.failedBefore = false;
    +		try {
    +			env.execute("One-to-one at least once test");
    +			fail("Job should fail!");
    +		}
    +		catch (Exception ex) {
    --- End diff --
    
    I think we need a more specific exception here.
    There may be actual exceptions thrown by Flink that would be masked by this assumption.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarant...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4206#discussion_r124466733
  
    --- Diff: flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---
    @@ -26,6 +26,7 @@
     import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
     import org.apache.flink.util.NetUtils;
     
    +import com.google.common.collect.ImmutableList;
    --- End diff --
    
    In Flink we usually try to avoid Guava usages. Would it be easy to switch to `Collections.unmodifiableList`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarantee at-l...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4206
  
    I wonder if we can actually replace this validation step with a validating map function after the Kafka producer sink. e.g. use a Flink Kafka consumer to read the results, followed by a validating flat map function?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarant...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4206#discussion_r124476992
  
    --- Diff: flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java ---
    @@ -18,17 +18,19 @@
     
     package org.apache.flink.streaming.connectors.kafka;
     
    -import org.junit.Test;
    -
     /**
      * IT cases for the {@link FlinkKafkaProducer08}.
      */
     @SuppressWarnings("serial")
     public class Kafka08ProducerITCase extends KafkaProducerTestBase {
     
    -	@Test
    -	public void testCustomPartitioning() {
    -		runCustomPartitioningTest();
    +	@Override
    +	public void testOneToOneAtLeastOnceRegularSink() throws Exception {
    +		// TODO: enable this for Kafka 0.8 - now it hangs indefinitely
    --- End diff --
    
    I will not fix this test (I'm pretty sure this is a test issue) within the scope of this ticket. I even think that it's not worth the effort to investigate it at all - it is difficult to debug those failure tests and Kafka 0.8 is pretty old.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarant...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4206#discussion_r125411974
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java ---
    @@ -80,6 +82,12 @@ public void createTestTopic(String topic, int numberOfPartitions, int replicatio
     
     	public abstract <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props);
     
    +	public abstract <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(
    +		Properties properties,
    +		String topic,
    +		int partition,
    +		long timeout);
    --- End diff --
    
    nit: the indentation pattern is inconsistent with the other abstract method declarations here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarant...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4206#discussion_r124467740
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java ---
    @@ -18,17 +18,13 @@
     
     package org.apache.flink.streaming.connectors.kafka;
     
    -import org.junit.Test;
    -
     /**
      * IT cases for the {@link FlinkKafkaProducer09}.
      */
     @SuppressWarnings("serial")
     public class Kafka09ProducerITCase extends KafkaProducerTestBase {
    -
    -	@Test
    -	public void testCustomPartitioning() {
    -		runCustomPartitioningTest();
    +	@Override
    +	public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
    +		// Disable this test since FlinkKafka09Producer doesn't support writing timestamps
    --- End diff --
    
    Same here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarant...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4206


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarantee at-l...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:

    https://github.com/apache/flink/pull/4206
  
    thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarant...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4206#discussion_r124466523
  
    --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java ---
    @@ -411,6 +414,18 @@ public void processElement(StreamRecord<T> element) throws Exception {
     		invokeInternal(element.getValue(), element.getTimestamp());
     	}
     
    +	@Override
    +	public void snapshotState(FunctionSnapshotContext context) throws Exception {
    +		final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
    +		internalProducer.snapshotState(context);
    +	}
    +
    +	@Override
    +	public void initializeState(FunctionInitializationContext context) throws Exception {
    --- End diff --
    
    nit: I would declare `initializeState` before `snapshotState`, just for the sake of a better logic flow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarantee at-l...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:

    https://github.com/apache/flink/pull/4206
  
    Hmm, this is disturbing. Locally it works for me always. I have rewritten test so that it should be less prone to intermittent failures (longer reading from Kafka timeout). Hopefully that will solve this issue, otherwise we still have some bug. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---