You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tzulitai <gi...@git.apache.org> on 2017/07/03 14:17:45 UTC

[GitHub] flink pull request #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarant...

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4206#discussion_r125295964
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java ---
    @@ -172,6 +194,118 @@ public void cancel() {
     		}
     	}
     
    +	/**
    +	 * Tests the at-least-once semantic for the simple writes into Kafka.
    +	 */
    +	@Test
    +	public void testOneToOneAtLeastOnceRegularSink() throws Exception {
    +		testOneToOneAtLeastOnce(true);
    +	}
    +
    +	/**
    +	 * Tests the at-least-once semantic for the simple writes into Kafka.
    +	 */
    +	@Test
    +	public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
    +		testOneToOneAtLeastOnce(false);
    +	}
    +
    +	/**
    +	 * This test sets KafkaProducer so that it will not automatically flush the data and
    +	 * and fails the broker to check whether FlinkKafkaProducer flushed records manually on snapshotState.
    +	 */
    +	protected void testOneToOneAtLeastOnce(boolean regularSink) throws Exception {
    +		final String topic = regularSink ? "oneToOneTopicRegularSink" : "oneToOneTopicCustomOperator";
    +		final int partition = 0;
    +		final int numElements = 1000;
    +		final int failAfterElements = 333;
    +
    +		createTestTopic(topic, 1, 1);
    +
    +		TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
    +		KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper(schema);
    +
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +		env.enableCheckpointing(500);
    +		env.setParallelism(1);
    +		env.setRestartStrategy(RestartStrategies.noRestart());
    +		env.getConfig().disableSysoutLogging();
    +
    +		Properties properties = new Properties();
    +		properties.putAll(standardProps);
    +		properties.putAll(secureProps);
    +		// decrease timeout and block time from 60s down to 10s - this is how long KafkaProducer will try send pending (not flushed) data on close()
    +		properties.setProperty("timeout.ms", "10000");
    +		properties.setProperty("max.block.ms", "10000");
    +		// increase batch.size and linger.ms - this tells KafkaProducer to batch produced events instead of flushing them immediately
    +		properties.setProperty("batch.size", "10240000");
    +		properties.setProperty("linger.ms", "10000");
    +
    +		int leaderId = kafkaServer.getLeaderToShutDown(topic);
    +		BrokerRestartingMapper.resetState();
    +
    +		// process exactly failAfterElements number of elements and then shutdown Kafka broker and fail application
    +		DataStream<Integer> inputStream = env
    +			.fromCollection(getIntegersSequence(numElements))
    +			.map(new BrokerRestartingMapper<Integer>(leaderId, failAfterElements));
    +
    +		StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
    +			@Override
    +			public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
    +				return partition;
    +			}
    +		});
    +
    +		if (regularSink) {
    +			inputStream.addSink(kafkaSink.getUserFunction());
    +		}
    +		else {
    +			kafkaServer.produceIntoKafka(inputStream, topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
    +				@Override
    +				public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
    +					return partition;
    +				}
    +			});
    +		}
    +
    +		FailingIdentityMapper.failedBefore = false;
    --- End diff --
    
    Why do we need this here? I don't see that the `FailingIdentityMapper` is used elsewhere in the pipeline.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---