You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by rmetzger <gi...@git.apache.org> on 2016/06/15 20:08:10 UTC

[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

GitHub user rmetzger opened a pull request:

    https://github.com/apache/flink/pull/2108

    [FLINK-4027] Flush FlinkKafkaProducer on checkpoints

    A user on the mailing list raised the point that our Kafka producer can be made at-least-once quite easily.
    The current producer code doesn't have any guarantees 
    
    We are using the producer's callbacks to account for unacknowledged records. When a checkpoint barrier reaches the sink, it will confirm the checkpoint once all pending records have been acked.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/rmetzger/flink flink4027

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2108.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2108
    
----
commit d657ca8be1420a3e73c48bbbf65788fbd0b75c2c
Author: Robert Metzger <rm...@apache.org>
Date:   2016-06-15T15:50:38Z

    [FLINK-4027] Flush FlinkKafkaProducer on checkpoints

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2108#discussion_r67318778
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java ---
    @@ -276,6 +315,38 @@ public void close() throws Exception {
     		checkErroneous();
     	}
     
    +	// ------------------- Logic for handling checkpoint flushing -------------------------- //
    +
    +	private void acknowledgeMessage() {
    +		if(!flushOnCheckpoint) {
    +			// the logic is disabled
    +			return;
    +		}
    +		pendingRecords--;
    +	}
    +
    +	protected abstract void flush();
    +
    +	@Override
    +	public Serializable snapshotState(long checkpointId, long checkpointTimestamp) {
    +		if(flushOnCheckpoint) {
    --- End diff --
    
    space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2108#discussion_r68086007
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java ---
    @@ -261,7 +298,9 @@ public void invoke(IN next) throws Exception {
     		} else {
     			record = new ProducerRecord<>(targetTopic, partitioner.partition(next, serializedKey, serializedValue, partitions.length), serializedKey, serializedValue);
     		}
    -
    +		if(flushOnCheckpoint) {
    +			pendingRecords++;
    --- End diff --
    
    The increment and decrement operations are not atomic. Thus, we have to guard them from concurrent access. Concurrent modifications can happen when you're in the `invoke` method and a callback is executed at the same time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

Posted by eliaslevy <gi...@git.apache.org>.
Github user eliaslevy commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2108#discussion_r67248839
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java ---
    @@ -276,6 +314,41 @@ public void close() throws Exception {
     		checkErroneous();
     	}
     
    +	// ------------------- Logic for handling checkpoint flushing -------------------------- //
    +
    +	private void acknowledgeMessage() {
    +		if(!flushOnCheckpoint) {
    +			// the logic is disabled
    +			return;
    +		}
    +		pendingRecords--;
    +	}
    +
    +	@Override
    +	public Serializable snapshotState(long checkpointId, long checkpointTimestamp) {
    +		if(flushOnCheckpoint) {
    +			// flushing is activated: We need to wait until pendingRecords is 0
    +			while(pendingRecords > 0) {
    +				try {
    +					Thread.sleep(10);
    --- End diff --
    
    Any reason to sleep instead of calling producer.flush() to wait for the acks?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2108#discussion_r68086261
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java ---
    @@ -125,4 +125,17 @@ public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> seriali
     		super(topicId, serializationSchema, producerConfig, customPartitioner);
     	}
     
    +	@Override
    +	protected void flush() {
    +		// The Kafka 0.8 producer doesn't support flushing, therefore, we are using an inefficient
    +		// busy wait approach
    +		while(pendingRecords > 0) {
    +			try {
    +				Thread.sleep(10);
    --- End diff --
    
    I'm not a big fan of busy waiting. Can't we wait on the lock for accessing `pendingRecords` and let the callbacks call `notify` when the `pendingRecords == 0`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoints

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/2108
  
    Thank you for your review @tillrohrmann and @zentol . I tried addressing all your concerns.
    Please let me know what you think about it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

Posted by eliaslevy <gi...@git.apache.org>.
Github user eliaslevy commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2108#discussion_r67247830
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java ---
    @@ -51,10 +54,11 @@
      * Flink Sink to produce data into a Kafka topic.
      *
      * Please note that this producer does not have any reliability guarantees.
    + * The producer implements the checkpointed interface for allowing synchronization on checkpoints.
    --- End diff --
    
    May want to change:
    
    > note that this producer does not have any reliability guarantees.
    
    to
    
    > note that this producer provides at-least-once reliability guarantees when checkpoints are enabled.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoints

Posted by eliaslevy <gi...@git.apache.org>.
Github user eliaslevy commented on the issue:

    https://github.com/apache/flink/pull/2108
  
    \U0001f44d 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2108#discussion_r67318905
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.java.tuple.Tuple1;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
    +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
    +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
    +import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
    +import org.apache.kafka.clients.producer.Callback;
    +import org.apache.kafka.clients.producer.KafkaProducer;
    +import org.apache.kafka.clients.producer.Producer;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.kafka.clients.producer.RecordMetadata;
    +import org.apache.kafka.common.Metric;
    +import org.apache.kafka.common.MetricName;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.serialization.ByteArraySerializer;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +/**
    + * Test ensuring that the producer is not dropping buffered records
    + */
    +@SuppressWarnings("unchecked")
    +public class AtLeastOnceProducerTest {
    +
    +	@Test
    +	public void testAtLeastOnceProducer() throws Exception {
    +		runTest(true);
    +	}
    +
    +	// This test ensures that the actual test fails if the flushing is disabled
    +	@Test(expected = AssertionError.class)
    +	public void ensureTestFails() throws Exception {
    +		runTest(false);
    +	}
    +
    +	private void runTest(boolean flushOnCheckpoint) throws Exception {
    +		Properties props = new Properties();
    +		final TestingKafkaProducer<String> producer = new TestingKafkaProducer<>("someTopic", new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props);
    +		producer.setFlushOnCheckpoint(flushOnCheckpoint);
    +		producer.setRuntimeContext(new MockRuntimeContext(0, 1));
    +
    +		producer.open(new Configuration());
    +
    +		for(int i = 0; i < 100; i++) {
    --- End diff --
    
    missing space after for


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2108#discussion_r67318738
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java ---
    @@ -261,7 +298,9 @@ public void invoke(IN next) throws Exception {
     		} else {
     			record = new ProducerRecord<>(targetTopic, partitioner.partition(next, serializedKey, serializedValue, partitions.length), serializedKey, serializedValue);
     		}
    -
    +		if(flushOnCheckpoint) {
    --- End diff --
    
    missing space after if


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoints

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2108
  
    Good work @rmetzger. Well documented code and a good idea to solve the problem. I had some comments concerning concurrent accesses to `pendingRecords` and test stability on Travis. Once we've solved theses points, I think the PR is good to be merged :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2108#discussion_r68085762
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java ---
    @@ -113,6 +123,14 @@
     	/** Errors encountered in the async producer are stored here */
     	protected transient volatile Exception asyncException;
     
    +	/**
    +	 * Number of unacknowledged records.
    +	 * There is no need to introduce additional locks because invoke() and snapshotState() are
    +	 * never called concurrently. So blocking the snapshotting will lock the invoke() method until all
    +	 * pending records have been confirmed.
    +	 */
    --- End diff --
    
    I think the fact that `invoke` and `snapshotState` are mutually exclusive is not important for the semantics of the `pendingRecords` variable. The reason is that it will only be incremented in `invoke` and decremented in the `callbacks` of the Kafka producer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2108#discussion_r68081660
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java ---
    @@ -50,11 +53,13 @@
     /**
      * Flink Sink to produce data into a Kafka topic.
      *
    - * Please note that this producer does not have any reliability guarantees.
    + * Please note that this producer provides at-least-once reliability guarantees when
    + * checkpoints are enabled and setFlushOnCheckpoint(true) is set.
    + * Otherwise, the producer doesn't provide any reliability guarantees.
    --- End diff --
    
    Does it make sense to completely remove the old behaviour and always enable flush on checkpoint? I'm wondering, because who would like to use a KafkaProducer with not processing guarantees?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2108#discussion_r68087265
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.java.tuple.Tuple1;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
    +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
    +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
    +import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
    +import org.apache.kafka.clients.producer.Callback;
    +import org.apache.kafka.clients.producer.KafkaProducer;
    +import org.apache.kafka.clients.producer.Producer;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.kafka.clients.producer.RecordMetadata;
    +import org.apache.kafka.common.Metric;
    +import org.apache.kafka.common.MetricName;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.serialization.ByteArraySerializer;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +/**
    + * Test ensuring that the producer is not dropping buffered records
    + */
    +@SuppressWarnings("unchecked")
    +public class AtLeastOnceProducerTest {
    +
    +	@Test
    +	public void testAtLeastOnceProducer() throws Exception {
    +		runTest(true);
    +	}
    +
    +	// This test ensures that the actual test fails if the flushing is disabled
    +	@Test(expected = AssertionError.class)
    +	public void ensureTestFails() throws Exception {
    +		runTest(false);
    +	}
    +
    +	private void runTest(boolean flushOnCheckpoint) throws Exception {
    +		Properties props = new Properties();
    +		final TestingKafkaProducer<String> producer = new TestingKafkaProducer<>("someTopic", new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props);
    +		producer.setFlushOnCheckpoint(flushOnCheckpoint);
    +		producer.setRuntimeContext(new MockRuntimeContext(0, 1));
    +
    +		producer.open(new Configuration());
    +
    +		for(int i = 0; i < 100; i++) {
    +			producer.invoke("msg-" + i);
    +		}
    +		// start a thread confirming all pending records
    +		final Tuple1<Throwable> runnableError = new Tuple1<>(null);
    +		final AtomicBoolean markOne = new AtomicBoolean(false);
    +		Runnable confirmer = new Runnable() {
    +			@Override
    +			public void run() {
    +				try {
    +					MockProducer mp = producer.getProducerInstance();
    +					List<Callback> pending = mp.getPending();
    +
    +					// we ensure thread A is locked and didn't reach markOne
    +					// give thread A some time to really reach the snapshot state
    +					Thread.sleep(500);
    --- End diff --
    
    Tests based on `Thread.sleep` are bound to become flakey on Travis. Usually it is better to synchronize on futures or locks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2108#discussion_r68082639
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java ---
    @@ -50,11 +53,13 @@
     /**
      * Flink Sink to produce data into a Kafka topic.
      *
    - * Please note that this producer does not have any reliability guarantees.
    + * Please note that this producer provides at-least-once reliability guarantees when
    + * checkpoints are enabled and setFlushOnCheckpoint(true) is set.
    + * Otherwise, the producer doesn't provide any reliability guarantees.
    --- End diff --
    
    My reasoning here was that we first provide this as an optional feature to those users who know what they are doing / what they need to give the feature exposure.
    I want to be certain that it works in all environments before we activate it by default.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2108#discussion_r69136271
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java ---
    @@ -35,69 +35,77 @@
     import org.apache.kafka.common.serialization.ByteArraySerializer;
     import org.junit.Assert;
     import org.junit.Test;
    +import scala.concurrent.duration.Deadline;
    +import scala.concurrent.duration.FiniteDuration;
     
    +import java.io.Serializable;
     import java.util.ArrayList;
     import java.util.List;
     import java.util.Map;
     import java.util.Properties;
     import java.util.concurrent.Future;
    -import java.util.concurrent.atomic.AtomicBoolean;
     
     /**
      * Test ensuring that the producer is not dropping buffered records
      */
     @SuppressWarnings("unchecked")
     public class AtLeastOnceProducerTest {
     
    -	@Test
    +	// we set a timeout because the test will not finish if the logic is broken
    +	@Test(timeout=5000)
     	public void testAtLeastOnceProducer() throws Exception {
     		runTest(true);
     	}
     
     	// This test ensures that the actual test fails if the flushing is disabled
    -	@Test(expected = AssertionError.class)
    +	@Test(expected = AssertionError.class, timeout=5000)
     	public void ensureTestFails() throws Exception {
     		runTest(false);
     	}
     
     	private void runTest(boolean flushOnCheckpoint) throws Exception {
     		Properties props = new Properties();
    -		final TestingKafkaProducer<String> producer = new TestingKafkaProducer<>("someTopic", new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props);
    +		final OneShotLatch snapshottingFinished = new OneShotLatch();
    +		final TestingKafkaProducer<String> producer = new TestingKafkaProducer<>("someTopic", new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props,
    +				snapshottingFinished);
     		producer.setFlushOnCheckpoint(flushOnCheckpoint);
     		producer.setRuntimeContext(new MockRuntimeContext(0, 1));
     
     		producer.open(new Configuration());
     
    -		for(int i = 0; i < 100; i++) {
    +		for (int i = 0; i < 100; i++) {
     			producer.invoke("msg-" + i);
     		}
     		// start a thread confirming all pending records
     		final Tuple1<Throwable> runnableError = new Tuple1<>(null);
    -		final AtomicBoolean markOne = new AtomicBoolean(false);
    +		final Thread threadA = Thread.currentThread();
    +
     		Runnable confirmer = new Runnable() {
     			@Override
     			public void run() {
     				try {
     					MockProducer mp = producer.getProducerInstance();
     					List<Callback> pending = mp.getPending();
     
    -					// we ensure thread A is locked and didn't reach markOne
    -					// give thread A some time to really reach the snapshot state
    -					Thread.sleep(500);
    -					if(markOne.get()) {
    -						Assert.fail("Snapshot was confirmed even though messages " +
    -								"were still in the buffer");
    +					// we need to find out if the snapshot() method blocks forever
    +					// this is not possible. If snapshot() is running, it will
    +					// start removing elements from the pending list.
    +					synchronized (threadA) {
    +						threadA.wait(500L);
     					}
    +					// we now check that no records have been confirmed yet
     					Assert.assertEquals(100, pending.size());
    +					Assert.assertFalse("Snapshot method returned before all records were confirmed",
    +							snapshottingFinished.hasTriggered());
     
     					// now confirm all checkpoints
    -					for(Callback c: pending) {
    +					for (Callback c: pending) {
     						c.onCompletion(null, null);
     					}
     					pending.clear();
    -					// wait for the snapshotState() method to return
    -					Thread.sleep(100);
    -					Assert.assertTrue("Snapshot state didn't return", markOne.get());
    +					// wait for the snapshotState() method to return. The will
    +					// fail if snapshotState never returns.
    +					snapshottingFinished.await();
    --- End diff --
    
    I think you don't need this condition here. There are two possibilities: 
    
    1. ThreadA leaves `snapshotState` successfully and waits for `threadB`. Since it completed `snapshotState`, the `snapshottingFinished` will be triggered. Thus, there is no waiting.
    
    2. ThreadA blocks in `snapshotState`. Then `threadB` does not have to block to trigger the timeout, because `threadA` is already blocked.
    
    Consequently, I think you could replace the `OneShotLatch` with a volatile boolean.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2108#discussion_r69130106
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java ---
    @@ -107,15 +115,16 @@ public void run() {
     		threadB.start();
     		// this should block:
     		producer.snapshotState(0, 0);
    -		// once all pending callbacks are confirmed, we can set this marker to true
    -		markOne.set(true);
    -		for(int i = 0; i < 99; i++) {
    -			producer.invoke("msg-" + i);
    +		synchronized (threadA) {
    +			threadA.notifyAll(); // just in case, to let the test fail faster
     		}
    -		// wait at most one second
    -		threadB.join(800L);
    -		Assert.assertFalse("Thread A reached this point too fast", threadB.isAlive());
    -		if(runnableError.f0 != null) {
    +
    +		Deadline deadline = FiniteDuration.apply(5, "s").fromNow();
    +		while (deadline.hasTimeLeft() && threadB.isAlive()) {
    +			threadB.join(500);
    +		}
    +		Assert.assertFalse("Thread A is expected to be finished at this point. If not, the test is prone to fail", threadB.isAlive());
    +		if (runnableError.f0 != null) {
     			runnableError.f0.printStackTrace();
     			Assert.fail("Error from thread B: " + runnableError.f0 );
    --- End diff --
    
    Printing the stack trace to stdout is imo not so good. The problem is that the stack trace will be intermingled with the rest of the testing log output. I think it's better to simply rethrow the `Throwable` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2108#discussion_r68086118
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java ---
    @@ -276,6 +315,38 @@ public void close() throws Exception {
     		checkErroneous();
     	}
     
    +	// ------------------- Logic for handling checkpoint flushing -------------------------- //
    +
    +	private void acknowledgeMessage() {
    +		if(!flushOnCheckpoint) {
    +			// the logic is disabled
    +			return;
    +		}
    +		pendingRecords--;
    --- End diff --
    
    I think it is necessary to guard against concurrent changes from the `invoke` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2108


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2108#discussion_r69136462
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java ---
    @@ -107,15 +115,16 @@ public void run() {
     		threadB.start();
     		// this should block:
     		producer.snapshotState(0, 0);
    -		// once all pending callbacks are confirmed, we can set this marker to true
    -		markOne.set(true);
    -		for(int i = 0; i < 99; i++) {
    -			producer.invoke("msg-" + i);
    +		synchronized (threadA) {
    +			threadA.notifyAll(); // just in case, to let the test fail faster
     		}
    -		// wait at most one second
    -		threadB.join(800L);
    -		Assert.assertFalse("Thread A reached this point too fast", threadB.isAlive());
    -		if(runnableError.f0 != null) {
    +
    --- End diff --
    
    I would insert here an assertion which checks that the number of pendingRecords is `0`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2108#discussion_r67318851
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java ---
    @@ -276,6 +315,38 @@ public void close() throws Exception {
     		checkErroneous();
     	}
     
    +	// ------------------- Logic for handling checkpoint flushing -------------------------- //
    +
    +	private void acknowledgeMessage() {
    +		if(!flushOnCheckpoint) {
    --- End diff --
    
    space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2108#discussion_r68087013
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.java.tuple.Tuple1;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
    +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
    +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
    +import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
    +import org.apache.kafka.clients.producer.Callback;
    +import org.apache.kafka.clients.producer.KafkaProducer;
    +import org.apache.kafka.clients.producer.Producer;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.kafka.clients.producer.RecordMetadata;
    +import org.apache.kafka.common.Metric;
    +import org.apache.kafka.common.MetricName;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.serialization.ByteArraySerializer;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +/**
    + * Test ensuring that the producer is not dropping buffered records
    + */
    +@SuppressWarnings("unchecked")
    +public class AtLeastOnceProducerTest {
    +
    +	@Test
    +	public void testAtLeastOnceProducer() throws Exception {
    +		runTest(true);
    +	}
    +
    +	// This test ensures that the actual test fails if the flushing is disabled
    +	@Test(expected = AssertionError.class)
    +	public void ensureTestFails() throws Exception {
    +		runTest(false);
    +	}
    +
    +	private void runTest(boolean flushOnCheckpoint) throws Exception {
    +		Properties props = new Properties();
    +		final TestingKafkaProducer<String> producer = new TestingKafkaProducer<>("someTopic", new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props);
    +		producer.setFlushOnCheckpoint(flushOnCheckpoint);
    +		producer.setRuntimeContext(new MockRuntimeContext(0, 1));
    +
    +		producer.open(new Configuration());
    +
    +		for(int i = 0; i < 100; i++) {
    +			producer.invoke("msg-" + i);
    +		}
    +		// start a thread confirming all pending records
    +		final Tuple1<Throwable> runnableError = new Tuple1<>(null);
    +		final AtomicBoolean markOne = new AtomicBoolean(false);
    +		Runnable confirmer = new Runnable() {
    +			@Override
    +			public void run() {
    +				try {
    +					MockProducer mp = producer.getProducerInstance();
    +					List<Callback> pending = mp.getPending();
    +
    +					// we ensure thread A is locked and didn't reach markOne
    +					// give thread A some time to really reach the snapshot state
    +					Thread.sleep(500);
    +					if(markOne.get()) {
    +						Assert.fail("Snapshot was confirmed even though messages " +
    +								"were still in the buffer");
    +					}
    +					Assert.assertEquals(100, pending.size());
    +
    +					// now confirm all checkpoints
    +					for(Callback c: pending) {
    +						c.onCompletion(null, null);
    +					}
    +					pending.clear();
    +					// wait for the snapshotState() method to return
    +					Thread.sleep(100);
    +					Assert.assertTrue("Snapshot state didn't return", markOne.get());
    +				} catch(Throwable t) {
    +					runnableError.f0 = t;
    +				}
    +			}
    +		};
    +		Thread threadB = new Thread(confirmer);
    +		threadB.start();
    +		// this should block:
    +		producer.snapshotState(0, 0);
    +		// once all pending callbacks are confirmed, we can set this marker to true
    +		markOne.set(true);
    +		for(int i = 0; i < 99; i++) {
    +			producer.invoke("msg-" + i);
    +		}
    --- End diff --
    
    Why do you invoke again the producer?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2108#discussion_r67318807
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java ---
    @@ -276,6 +315,38 @@ public void close() throws Exception {
     		checkErroneous();
     	}
     
    +	// ------------------- Logic for handling checkpoint flushing -------------------------- //
    +
    +	private void acknowledgeMessage() {
    +		if(!flushOnCheckpoint) {
    +			// the logic is disabled
    +			return;
    +		}
    +		pendingRecords--;
    +	}
    +
    +	protected abstract void flush();
    +
    +	@Override
    +	public Serializable snapshotState(long checkpointId, long checkpointTimestamp) {
    +		if(flushOnCheckpoint) {
    +			// flushing is activated: We need to wait until pendingRecords is 0
    +			flush();
    +
    +			if(pendingRecords != 0) {
    --- End diff --
    
    space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2108#discussion_r67318603
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java ---
    @@ -125,4 +125,17 @@ public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> seriali
     		super(topicId, serializationSchema, producerConfig, customPartitioner);
     	}
     
    +	@Override
    +	protected void flush() {
    +		// The Kafka 0.8 producer doesn't support flushing, therefore, we are using an inefficient
    +		// busy wait approach
    +		while(pendingRecords > 0) {
    --- End diff --
    
    missing space after while


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2108#discussion_r67318672
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java ---
    @@ -127,4 +127,11 @@ public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> seriali
     	public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
     		super(topicId, serializationSchema, producerConfig, customPartitioner);
     	}
    +
    +	@Override
    +	protected void flush() {
    +		if(this.producer != null) {
    --- End diff --
    
    missing space after if


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoints

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2108
  
    Changes look good to me @rmetzger :-) I had only some minor comments. +1 for merging after addressing the comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2108#discussion_r68087375
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.java.tuple.Tuple1;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
    +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
    +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
    +import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
    +import org.apache.kafka.clients.producer.Callback;
    +import org.apache.kafka.clients.producer.KafkaProducer;
    +import org.apache.kafka.clients.producer.Producer;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.kafka.clients.producer.RecordMetadata;
    +import org.apache.kafka.common.Metric;
    +import org.apache.kafka.common.MetricName;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.serialization.ByteArraySerializer;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +/**
    + * Test ensuring that the producer is not dropping buffered records
    + */
    +@SuppressWarnings("unchecked")
    +public class AtLeastOnceProducerTest {
    +
    +	@Test
    +	public void testAtLeastOnceProducer() throws Exception {
    +		runTest(true);
    +	}
    +
    +	// This test ensures that the actual test fails if the flushing is disabled
    +	@Test(expected = AssertionError.class)
    +	public void ensureTestFails() throws Exception {
    +		runTest(false);
    +	}
    +
    +	private void runTest(boolean flushOnCheckpoint) throws Exception {
    +		Properties props = new Properties();
    +		final TestingKafkaProducer<String> producer = new TestingKafkaProducer<>("someTopic", new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props);
    +		producer.setFlushOnCheckpoint(flushOnCheckpoint);
    +		producer.setRuntimeContext(new MockRuntimeContext(0, 1));
    +
    +		producer.open(new Configuration());
    +
    +		for(int i = 0; i < 100; i++) {
    +			producer.invoke("msg-" + i);
    +		}
    +		// start a thread confirming all pending records
    +		final Tuple1<Throwable> runnableError = new Tuple1<>(null);
    +		final AtomicBoolean markOne = new AtomicBoolean(false);
    +		Runnable confirmer = new Runnable() {
    +			@Override
    +			public void run() {
    +				try {
    +					MockProducer mp = producer.getProducerInstance();
    +					List<Callback> pending = mp.getPending();
    +
    +					// we ensure thread A is locked and didn't reach markOne
    +					// give thread A some time to really reach the snapshot state
    +					Thread.sleep(500);
    +					if(markOne.get()) {
    +						Assert.fail("Snapshot was confirmed even though messages " +
    +								"were still in the buffer");
    +					}
    +					Assert.assertEquals(100, pending.size());
    +
    +					// now confirm all checkpoints
    +					for(Callback c: pending) {
    +						c.onCompletion(null, null);
    +					}
    +					pending.clear();
    +					// wait for the snapshotState() method to return
    +					Thread.sleep(100);
    --- End diff --
    
    Here again, who tells us that Thread A has progressed after 100 ms to the point where he set `markOne` to true.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoints

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/2108
  
    Thank you for the review @eliaslevy!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2108#discussion_r68087686
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java ---
    @@ -50,11 +53,13 @@
     /**
      * Flink Sink to produce data into a Kafka topic.
      *
    - * Please note that this producer does not have any reliability guarantees.
    + * Please note that this producer provides at-least-once reliability guarantees when
    + * checkpoints are enabled and setFlushOnCheckpoint(true) is set.
    + * Otherwise, the producer doesn't provide any reliability guarantees.
    --- End diff --
    
    Alright, this makes totally sense


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2108#discussion_r67296678
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java ---
    @@ -276,6 +314,41 @@ public void close() throws Exception {
     		checkErroneous();
     	}
     
    +	// ------------------- Logic for handling checkpoint flushing -------------------------- //
    +
    +	private void acknowledgeMessage() {
    +		if(!flushOnCheckpoint) {
    +			// the logic is disabled
    +			return;
    +		}
    +		pendingRecords--;
    +	}
    +
    +	@Override
    +	public Serializable snapshotState(long checkpointId, long checkpointTimestamp) {
    +		if(flushOnCheckpoint) {
    +			// flushing is activated: We need to wait until pendingRecords is 0
    +			while(pendingRecords > 0) {
    +				try {
    +					Thread.sleep(10);
    --- End diff --
    
    The problem is that the `flush()` method is only implemented by the Kafka 0.9 producer, not by the 0.8 implementation.
    As you can see from the classname, its the shared base class between the two version specific implementations. I think for the 0.8 producer, there is no way around the waiting approach.
    
    I'll update the pull request to call `flush()` on the 0.9 producer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---