You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tzulitai <gi...@git.apache.org> on 2017/02/06 16:55:17 UTC

[GitHub] flink pull request #3278: [FLINK-5701] [kafka] FlinkKafkaProducer should che...

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/3278

    [FLINK-5701] [kafka] FlinkKafkaProducer should check asyncException on checkpoints

    Prior to this change, at-least-once is violated for the FlinkKafkaProducer because we were not failing checkpoints if there were async exceptions from the Kafka producer.
    
    With this PR, on `snapshotState()`, we fail if there previously were async exceptions. We also fail if the flushed records on checkpoint resulted in exceptions.
    
    This PR also improves the tests in `FlinkKafkaProducerBaseTest` to use one-shot latches instead of sleeping for more stable tests. It also removes the test `testAtLeastOnceProducerFailsIfFlushingDisabled()`. My reasoning is that essentially, you _might_ still have at-least-once even if flushing is disabled (i.e. always no pending records to flush on checkpoint), so I don't see the necessity in having that test. I'm open to discussing the removal of that test and adding it back if others think it's necessary.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-5701

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3278.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3278
    
----
commit c3eb0a905b86c6265af91c4bda7d2c9da2dc6ce2
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Date:   2017-02-06T16:37:13Z

    [FLINK-5701] [kafka] FlinkKafkaProducer should check asyncException on checkpoints

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3278: [FLINK-5701] [kafka] FlinkKafkaProducer should che...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3278#discussion_r100696016
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java ---
    @@ -88,195 +87,296 @@ public void testKeyValueDeserializersSetIfMissing() throws Exception {
     	@Test
     	public void testPartitionerOpenedWithDeterminatePartitionList() throws Exception {
     		KafkaPartitioner mockPartitioner = mock(KafkaPartitioner.class);
    +
     		RuntimeContext mockRuntimeContext = mock(RuntimeContext.class);
     		when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
     		when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
    +		
    +		// out-of-order list of 4 partitions
    +		List<PartitionInfo> mockPartitionsList = new ArrayList<>(4);
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 3, null, null, null));
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 1, null, null, null));
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 0, null, null, null));
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 2, null, null, null));
    +		
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		when(mockProducer.partitionsFor(anyString())).thenReturn(mockPartitionsList);
    +		when(mockProducer.metrics()).thenReturn(null);
     
     		DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(
    -			FakeStandardProducerConfig.get(), mockPartitioner);
    +			FakeStandardProducerConfig.get(), mockPartitioner, mockProducer);
     		producer.setRuntimeContext(mockRuntimeContext);
     
     		producer.open(new Configuration());
     
    -		// the internal mock KafkaProducer will return an out-of-order list of 4 partitions,
    -		// which should be sorted before provided to the custom partitioner's open() method
    +		// the out-of-order partitions list should be sorted before provided to the custom partitioner's open() method
     		int[] correctPartitionList = {0, 1, 2, 3};
     		verify(mockPartitioner).open(0, 1, correctPartitionList);
     	}
     
     	/**
    -	 * Test ensuring that the producer is not dropping buffered records.;
    -	 * we set a timeout because the test will not finish if the logic is broken
    +	 * Test ensuring that if an invoke call happens right after an async exception is caught, it should be rethrown
     	 */
    -	@Test(timeout=5000)
    -	public void testAtLeastOnceProducer() throws Throwable {
    -		runAtLeastOnceTest(true);
    +	@Test
    +	public void testAsyncErrorRethrownOnInvoke() throws Throwable {
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, mockProducer);
    +
    +		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +		// let the message request return an async exception
    +		producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception"));
    +
    +		try {
    +			testHarness.processElement(new StreamRecord<>("msg-2"));
    +		} catch (Exception e) {
    +			// the next invoke should rethrow the async exception
    +			Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception"));
    +			return;
    +		}
    +
    +		Assert.fail();
     	}
     
     	/**
    -	 * Ensures that the at least once producing test fails if the flushing is disabled
    +	 * Test ensuring that if a snapshot call happens right after an async exception is caught, it should be rethrown
     	 */
    -	@Test(expected = AssertionError.class, timeout=5000)
    -	public void testAtLeastOnceProducerFailsIfFlushingDisabled() throws Throwable {
    -		runAtLeastOnceTest(false);
    -	}
    -
    -	private void runAtLeastOnceTest(boolean flushOnCheckpoint) throws Throwable {
    -		final AtomicBoolean snapshottingFinished = new AtomicBoolean(false);
    +	@Test
    +	public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
     		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    -			FakeStandardProducerConfig.get(), null, snapshottingFinished);
    -		producer.setFlushOnCheckpoint(flushOnCheckpoint);
    +			FakeStandardProducerConfig.get(), null, mockProducer);
     
     		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    -				new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
     
     		testHarness.open();
     
    -		for (int i = 0; i < 100; i++) {
    -			testHarness.processElement(new StreamRecord<>("msg-" + i));
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +		// let the message request return an async exception
    +		producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception"));
    +
    +		try {
    +			testHarness.snapshot(123L, 123L);
    +		} catch (Exception e) {
    +			// the next invoke should rethrow the async exception
    +			Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception"));
    +			return;
     		}
     
    -		// start a thread confirming all pending records
    -		final Tuple1<Throwable> runnableError = new Tuple1<>(null);
    -		final Thread threadA = Thread.currentThread();
    +		Assert.fail();
    +	}
     
    -		Runnable confirmer = new Runnable() {
    +	/**
    +	 * Test ensuring that if an async exception is caught for one of the flushed requests on checkpoint,
    +	 * it should be rethrown; we set a timeout because the test will not finish if the logic is broken.
    +	 *
    +	 * Note that this test does not test the snapshot method is blocked correctly when there are pending recorrds.
    +	 * The test for that is covered in testAtLeastOnceProducer.
    +	 */
    +	@Test(timeout=5000)
    +	public void testAsyncErrorRethrownOnCheckpointAfterFlush() throws Throwable {
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, mockProducer);
    +		producer.setFlushOnCheckpoint(true);
    +
    +		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +		testHarness.processElement(new StreamRecord<>("msg-2"));
    +		testHarness.processElement(new StreamRecord<>("msg-3"));
    +
    +		verify(mockProducer, times(3)).send(any(ProducerRecord.class), any(Callback.class));
    +
    +		// only let the first callback succeed for now
    +		producer.getPendingCallbacks().get(0).onCompletion(null, null);
    +
    +		final Tuple1<Throwable> asyncError = new Tuple1<>(null);
    +		Thread snapshotThread = new Thread(new Runnable() {
     			@Override
     			public void run() {
     				try {
    -					MockProducer mp = producer.getProducerInstance();
    -					List<Callback> pending = mp.getPending();
    -
    -					// we need to find out if the snapshot() method blocks forever
    -					// this is not possible. If snapshot() is running, it will
    -					// start removing elements from the pending list.
    -					synchronized (threadA) {
    -						threadA.wait(500L);
    -					}
    -					// we now check that no records have been confirmed yet
    -					Assert.assertEquals(100, pending.size());
    -					Assert.assertFalse("Snapshot method returned before all records were confirmed",
    -						snapshottingFinished.get());
    -
    -					// now confirm all checkpoints
    -					for (Callback c: pending) {
    -						c.onCompletion(null, null);
    -					}
    -					pending.clear();
    -				} catch(Throwable t) {
    -					runnableError.f0 = t;
    +					// this should block at first, since there are still two pending records that needs to be flushed
    +					testHarness.snapshot(123L, 123L);
    +				} catch (Exception e) {
    +					asyncError.f0 = e;
     				}
     			}
    -		};
    -		Thread threadB = new Thread(confirmer);
    -		threadB.start();
    +		});
    +		snapshotThread.start();
     
    -		// this should block:
    -		testHarness.snapshot(0, 0);
    +		// let the 2nd message fail with an async exception
    +		producer.getPendingCallbacks().get(1).onCompletion(null, new Exception("artificial async failure for 2nd message"));
    +		producer.getPendingCallbacks().get(2).onCompletion(null, null);
     
    -		synchronized (threadA) {
    -			threadA.notifyAll(); // just in case, to let the test fail faster
    -		}
    -		Assert.assertEquals(0, producer.getProducerInstance().getPending().size());
    -		Deadline deadline = FiniteDuration.apply(5, "s").fromNow();
    -		while (deadline.hasTimeLeft() && threadB.isAlive()) {
    -			threadB.join(500);
    +		snapshotThread.join();
    +
    +		// the snapshot should have failed with the async exception
    +		Assert.assertTrue(asyncError.f0 != null && asyncError.f0.getCause().getMessage().contains("artificial async failure for 2nd message"));
    +	}
    +
    +	/**
    +	 * Test ensuring that the producer is not dropping buffered records;
    +	 * we set a timeout because the test will not finish if the logic is broken
    +	 */
    +	@Test(timeout=10000)
    +	public void testAtLeastOnceProducer() throws Throwable {
    +
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, mockProducer);
    +		producer.setFlushOnCheckpoint(true);
    +
    +		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +		testHarness.processElement(new StreamRecord<>("msg-2"));
    +		testHarness.processElement(new StreamRecord<>("msg-3"));
    +
    +		verify(mockProducer, times(3)).send(any(ProducerRecord.class), any(Callback.class));
    +		Assert.assertEquals(3, producer.getPendingSize());
    +
    +		// start a thread to perform checkpointing
    +		final Tuple1<Throwable> runnableError = new Tuple1<>(null);
    +		final OneShotLatch snapshotReturnedLatch = new OneShotLatch();
    +
    +		Thread snapshotThread = new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				try {
    +					// this should block until all records are flushed
    +					testHarness.snapshot(123L, 123L);
    +				} catch (Throwable e) {
    +					runnableError.f0 = e;
    +				} finally {
    +					snapshotReturnedLatch.trigger();
    +				}
    +			}
    +		});
    +		snapshotThread.start();
    +
    +		// being extra safe that the snapshot is correctly blocked
    +		try {
    +			snapshotReturnedLatch.await(3, TimeUnit.SECONDS);
    --- End diff --
    
    Maybe you could override the `DummyFlinkKafkaProducer#flush` method to insert some latches to see when you enter and when the method is done. Then you could wait on the first latch and check with the latter whether the method has completed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3278: [FLINK-5701] [kafka] FlinkKafkaProducer should che...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3278#discussion_r101973453
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java ---
    @@ -88,201 +85,321 @@ public void testKeyValueDeserializersSetIfMissing() throws Exception {
     	@Test
     	public void testPartitionerOpenedWithDeterminatePartitionList() throws Exception {
     		KafkaPartitioner mockPartitioner = mock(KafkaPartitioner.class);
    +
     		RuntimeContext mockRuntimeContext = mock(RuntimeContext.class);
     		when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
     		when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
    -
    -		DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(
    +		
    +		// out-of-order list of 4 partitions
    +		List<PartitionInfo> mockPartitionsList = new ArrayList<>(4);
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 3, null, null, null));
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 1, null, null, null));
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 0, null, null, null));
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 2, null, null, null));
    +
    +		final DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(
     			FakeStandardProducerConfig.get(), mockPartitioner);
     		producer.setRuntimeContext(mockRuntimeContext);
     
    +		final KafkaProducer mockProducer = producer.getMockKafkaProducer();
    +		when(mockProducer.partitionsFor(anyString())).thenReturn(mockPartitionsList);
    +		when(mockProducer.metrics()).thenReturn(null);
    +
     		producer.open(new Configuration());
     
    -		// the internal mock KafkaProducer will return an out-of-order list of 4 partitions,
    -		// which should be sorted before provided to the custom partitioner's open() method
    +		// the out-of-order partitions list should be sorted before provided to the custom partitioner's open() method
     		int[] correctPartitionList = {0, 1, 2, 3};
     		verify(mockPartitioner).open(0, 1, correctPartitionList);
     	}
     
     	/**
    -	 * Test ensuring that the producer is not dropping buffered records.;
    -	 * we set a timeout because the test will not finish if the logic is broken
    +	 * Test ensuring that if an invoke call happens right after an async exception is caught, it should be rethrown
     	 */
    -	@Test(timeout=5000)
    -	public void testAtLeastOnceProducer() throws Throwable {
    -		runAtLeastOnceTest(true);
    +	@Test
    +	public void testAsyncErrorRethrownOnInvoke() throws Throwable {
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null);
    +
    +		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
    +
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +		// let the message request return an async exception
    +		producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception"));
    +
    +		try {
    +			testHarness.processElement(new StreamRecord<>("msg-2"));
    +		} catch (Exception e) {
    +			// the next invoke should rethrow the async exception
    +			Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception"));
    +
    +			// test succeeded
    +			return;
    +		}
    +
    +		Assert.fail();
     	}
     
     	/**
    -	 * Ensures that the at least once producing test fails if the flushing is disabled
    +	 * Test ensuring that if a snapshot call happens right after an async exception is caught, it should be rethrown
     	 */
    -	@Test(expected = AssertionError.class, timeout=5000)
    -	public void testAtLeastOnceProducerFailsIfFlushingDisabled() throws Throwable {
    -		runAtLeastOnceTest(false);
    -	}
    -
    -	private void runAtLeastOnceTest(boolean flushOnCheckpoint) throws Throwable {
    -		final AtomicBoolean snapshottingFinished = new AtomicBoolean(false);
    +	@Test
    +	public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
     		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    -			FakeStandardProducerConfig.get(), null, snapshottingFinished);
    -		producer.setFlushOnCheckpoint(flushOnCheckpoint);
    +			FakeStandardProducerConfig.get(), null);
     
     		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    -				new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
     
     		testHarness.open();
     
    -		for (int i = 0; i < 100; i++) {
    -			testHarness.processElement(new StreamRecord<>("msg-" + i));
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +		// let the message request return an async exception
    +		producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception"));
    +
    +		try {
    +			testHarness.snapshot(123L, 123L);
    +		} catch (Exception e) {
    +			// the next invoke should rethrow the async exception
    +			Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception"));
    +
    +			// test succeeded
    +			return;
     		}
     
    -		// start a thread confirming all pending records
    -		final Tuple1<Throwable> runnableError = new Tuple1<>(null);
    -		final Thread threadA = Thread.currentThread();
    +		Assert.fail();
    +	}
    +
    +	/**
    +	 * Test ensuring that if an async exception is caught for one of the flushed requests on checkpoint,
    +	 * it should be rethrown; we set a timeout because the test will not finish if the logic is broken.
    +	 *
    +	 * Note that this test does not test the snapshot method is blocked correctly when there are pending recorrds.
    +	 * The test for that is covered in testAtLeastOnceProducer.
    +	 */
    +	@SuppressWarnings("unchecked")
    +	@Test(timeout=5000)
    +	public void testAsyncErrorRethrownOnCheckpointAfterFlush() throws Throwable {
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null);
    +		producer.setFlushOnCheckpoint(true);
    +
    +		final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer();
    +
    +		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
    +
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +		testHarness.processElement(new StreamRecord<>("msg-2"));
    +		testHarness.processElement(new StreamRecord<>("msg-3"));
    +
    +		verify(mockProducer, times(3)).send(any(ProducerRecord.class), any(Callback.class));
    +
    +		// only let the first callback succeed for now
    +		producer.getPendingCallbacks().get(0).onCompletion(null, null);
     
    -		Runnable confirmer = new Runnable() {
    +		CheckedThread snapshotThread = new CheckedThread() {
     			@Override
    -			public void run() {
    -				try {
    -					MockProducer mp = producer.getProducerInstance();
    -					List<Callback> pending = mp.getPending();
    -
    -					// we need to find out if the snapshot() method blocks forever
    -					// this is not possible. If snapshot() is running, it will
    -					// start removing elements from the pending list.
    -					synchronized (threadA) {
    -						threadA.wait(500L);
    -					}
    -					// we now check that no records have been confirmed yet
    -					Assert.assertEquals(100, pending.size());
    -					Assert.assertFalse("Snapshot method returned before all records were confirmed",
    -						snapshottingFinished.get());
    -
    -					// now confirm all checkpoints
    -					for (Callback c: pending) {
    -						c.onCompletion(null, null);
    -					}
    -					pending.clear();
    -				} catch(Throwable t) {
    -					runnableError.f0 = t;
    -				}
    +			public void go() throws Exception {
    +				// this should block at first, since there are still two pending records that needs to be flushed
    +				testHarness.snapshot(123L, 123L);
     			}
     		};
    -		Thread threadB = new Thread(confirmer);
    -		threadB.start();
    +		snapshotThread.start();
     
    -		// this should block:
    -		testHarness.snapshot(0, 0);
    +		// let the 2nd message fail with an async exception
    +		producer.getPendingCallbacks().get(1).onCompletion(null, new Exception("artificial async failure for 2nd message"));
    +		producer.getPendingCallbacks().get(2).onCompletion(null, null);
     
    -		synchronized (threadA) {
    -			threadA.notifyAll(); // just in case, to let the test fail faster
    -		}
    -		Assert.assertEquals(0, producer.getProducerInstance().getPending().size());
    -		Deadline deadline = FiniteDuration.apply(5, "s").fromNow();
    -		while (deadline.hasTimeLeft() && threadB.isAlive()) {
    -			threadB.join(500);
    -		}
    -		Assert.assertFalse("Thread A is expected to be finished at this point. If not, the test is prone to fail", threadB.isAlive());
    -		if (runnableError.f0 != null) {
    -			throw runnableError.f0;
    +		try {
    +			snapshotThread.sync();
    +		} catch (Exception e) {
    +			// the snapshot should have failed with the async exception
    +			Assert.assertTrue(e.getCause().getMessage().contains("artificial async failure for 2nd message"));
    +
    +			// test succeeded
    +			return;
     		}
     
    +		Assert.fail();
    +	}
    +
    +	/**
    +	 * Test ensuring that the producer is not dropping buffered records;
    +	 * we set a timeout because the test will not finish if the logic is broken
    +	 */
    +	@SuppressWarnings("unchecked")
    +	@Test(timeout=10000)
    +	public void testAtLeastOnceProducer() throws Throwable {
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null);
    +		producer.setFlushOnCheckpoint(true);
    +
    +		final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer();
    +
    +		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
    +
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +		testHarness.processElement(new StreamRecord<>("msg-2"));
    +		testHarness.processElement(new StreamRecord<>("msg-3"));
    +
    +		verify(mockProducer, times(3)).send(any(ProducerRecord.class), any(Callback.class));
    +		Assert.assertEquals(3, producer.getPendingSize());
    +
    +		// start a thread to perform checkpointing
    +		CheckedThread snapshotThread = new CheckedThread() {
    +			@Override
    +			public void go() throws Exception {
    +				// this should block until all records are flushed;
    +				// if the snapshot implementation returns before pending records are flushed,
    +				testHarness.snapshot(123L, 123L);
    +			}
    +		};
    +		snapshotThread.start();
    +
    +		// being extra safe that the snapshot is correctly blocked;
    +		// after some arbitrary time, the snapshot should still be blocked
    +		snapshotThread.join(3000);
    --- End diff --
    
    Now I see what you mean, and the purpose of the latch you were previously proposing. Thanks for the comment. I will address this and proceed to merge the PR later today.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3278: [FLINK-5701] [kafka] FlinkKafkaProducer should che...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3278#discussion_r101273458
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java ---
    @@ -88,195 +87,296 @@ public void testKeyValueDeserializersSetIfMissing() throws Exception {
     	@Test
     	public void testPartitionerOpenedWithDeterminatePartitionList() throws Exception {
     		KafkaPartitioner mockPartitioner = mock(KafkaPartitioner.class);
    +
     		RuntimeContext mockRuntimeContext = mock(RuntimeContext.class);
     		when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
     		when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
    +		
    +		// out-of-order list of 4 partitions
    +		List<PartitionInfo> mockPartitionsList = new ArrayList<>(4);
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 3, null, null, null));
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 1, null, null, null));
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 0, null, null, null));
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 2, null, null, null));
    +		
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		when(mockProducer.partitionsFor(anyString())).thenReturn(mockPartitionsList);
    +		when(mockProducer.metrics()).thenReturn(null);
     
     		DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(
    -			FakeStandardProducerConfig.get(), mockPartitioner);
    +			FakeStandardProducerConfig.get(), mockPartitioner, mockProducer);
     		producer.setRuntimeContext(mockRuntimeContext);
     
     		producer.open(new Configuration());
     
    -		// the internal mock KafkaProducer will return an out-of-order list of 4 partitions,
    -		// which should be sorted before provided to the custom partitioner's open() method
    +		// the out-of-order partitions list should be sorted before provided to the custom partitioner's open() method
     		int[] correctPartitionList = {0, 1, 2, 3};
     		verify(mockPartitioner).open(0, 1, correctPartitionList);
     	}
     
     	/**
    -	 * Test ensuring that the producer is not dropping buffered records.;
    -	 * we set a timeout because the test will not finish if the logic is broken
    +	 * Test ensuring that if an invoke call happens right after an async exception is caught, it should be rethrown
     	 */
    -	@Test(timeout=5000)
    -	public void testAtLeastOnceProducer() throws Throwable {
    -		runAtLeastOnceTest(true);
    +	@Test
    +	public void testAsyncErrorRethrownOnInvoke() throws Throwable {
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, mockProducer);
    +
    +		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +		// let the message request return an async exception
    +		producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception"));
    +
    +		try {
    +			testHarness.processElement(new StreamRecord<>("msg-2"));
    +		} catch (Exception e) {
    +			// the next invoke should rethrow the async exception
    +			Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception"));
    +			return;
    +		}
    +
    +		Assert.fail();
     	}
     
     	/**
    -	 * Ensures that the at least once producing test fails if the flushing is disabled
    +	 * Test ensuring that if a snapshot call happens right after an async exception is caught, it should be rethrown
     	 */
    -	@Test(expected = AssertionError.class, timeout=5000)
    -	public void testAtLeastOnceProducerFailsIfFlushingDisabled() throws Throwable {
    -		runAtLeastOnceTest(false);
    -	}
    -
    -	private void runAtLeastOnceTest(boolean flushOnCheckpoint) throws Throwable {
    -		final AtomicBoolean snapshottingFinished = new AtomicBoolean(false);
    +	@Test
    +	public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
     		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    -			FakeStandardProducerConfig.get(), null, snapshottingFinished);
    -		producer.setFlushOnCheckpoint(flushOnCheckpoint);
    +			FakeStandardProducerConfig.get(), null, mockProducer);
     
     		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    -				new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
     
     		testHarness.open();
     
    -		for (int i = 0; i < 100; i++) {
    -			testHarness.processElement(new StreamRecord<>("msg-" + i));
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +		// let the message request return an async exception
    +		producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception"));
    +
    +		try {
    +			testHarness.snapshot(123L, 123L);
    +		} catch (Exception e) {
    +			// the next invoke should rethrow the async exception
    +			Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception"));
    +			return;
     		}
     
    -		// start a thread confirming all pending records
    -		final Tuple1<Throwable> runnableError = new Tuple1<>(null);
    -		final Thread threadA = Thread.currentThread();
    +		Assert.fail();
    +	}
     
    -		Runnable confirmer = new Runnable() {
    +	/**
    +	 * Test ensuring that if an async exception is caught for one of the flushed requests on checkpoint,
    +	 * it should be rethrown; we set a timeout because the test will not finish if the logic is broken.
    +	 *
    +	 * Note that this test does not test the snapshot method is blocked correctly when there are pending recorrds.
    +	 * The test for that is covered in testAtLeastOnceProducer.
    +	 */
    +	@Test(timeout=5000)
    +	public void testAsyncErrorRethrownOnCheckpointAfterFlush() throws Throwable {
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, mockProducer);
    +		producer.setFlushOnCheckpoint(true);
    +
    +		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +		testHarness.processElement(new StreamRecord<>("msg-2"));
    +		testHarness.processElement(new StreamRecord<>("msg-3"));
    +
    +		verify(mockProducer, times(3)).send(any(ProducerRecord.class), any(Callback.class));
    +
    +		// only let the first callback succeed for now
    +		producer.getPendingCallbacks().get(0).onCompletion(null, null);
    +
    +		final Tuple1<Throwable> asyncError = new Tuple1<>(null);
    +		Thread snapshotThread = new Thread(new Runnable() {
     			@Override
     			public void run() {
     				try {
    -					MockProducer mp = producer.getProducerInstance();
    -					List<Callback> pending = mp.getPending();
    -
    -					// we need to find out if the snapshot() method blocks forever
    -					// this is not possible. If snapshot() is running, it will
    -					// start removing elements from the pending list.
    -					synchronized (threadA) {
    -						threadA.wait(500L);
    -					}
    -					// we now check that no records have been confirmed yet
    -					Assert.assertEquals(100, pending.size());
    -					Assert.assertFalse("Snapshot method returned before all records were confirmed",
    -						snapshottingFinished.get());
    -
    -					// now confirm all checkpoints
    -					for (Callback c: pending) {
    -						c.onCompletion(null, null);
    -					}
    -					pending.clear();
    -				} catch(Throwable t) {
    -					runnableError.f0 = t;
    +					// this should block at first, since there are still two pending records that needs to be flushed
    +					testHarness.snapshot(123L, 123L);
    +				} catch (Exception e) {
    +					asyncError.f0 = e;
     				}
     			}
    -		};
    -		Thread threadB = new Thread(confirmer);
    -		threadB.start();
    +		});
    +		snapshotThread.start();
     
    -		// this should block:
    -		testHarness.snapshot(0, 0);
    +		// let the 2nd message fail with an async exception
    +		producer.getPendingCallbacks().get(1).onCompletion(null, new Exception("artificial async failure for 2nd message"));
    +		producer.getPendingCallbacks().get(2).onCompletion(null, null);
     
    -		synchronized (threadA) {
    -			threadA.notifyAll(); // just in case, to let the test fail faster
    -		}
    -		Assert.assertEquals(0, producer.getProducerInstance().getPending().size());
    -		Deadline deadline = FiniteDuration.apply(5, "s").fromNow();
    -		while (deadline.hasTimeLeft() && threadB.isAlive()) {
    -			threadB.join(500);
    +		snapshotThread.join();
    +
    +		// the snapshot should have failed with the async exception
    +		Assert.assertTrue(asyncError.f0 != null && asyncError.f0.getCause().getMessage().contains("artificial async failure for 2nd message"));
    +	}
    +
    +	/**
    +	 * Test ensuring that the producer is not dropping buffered records;
    +	 * we set a timeout because the test will not finish if the logic is broken
    +	 */
    +	@Test(timeout=10000)
    +	public void testAtLeastOnceProducer() throws Throwable {
    +
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, mockProducer);
    +		producer.setFlushOnCheckpoint(true);
    +
    +		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +		testHarness.processElement(new StreamRecord<>("msg-2"));
    +		testHarness.processElement(new StreamRecord<>("msg-3"));
    +
    +		verify(mockProducer, times(3)).send(any(ProducerRecord.class), any(Callback.class));
    +		Assert.assertEquals(3, producer.getPendingSize());
    +
    +		// start a thread to perform checkpointing
    +		final Tuple1<Throwable> runnableError = new Tuple1<>(null);
    +		final OneShotLatch snapshotReturnedLatch = new OneShotLatch();
    +
    +		Thread snapshotThread = new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				try {
    +					// this should block until all records are flushed
    +					testHarness.snapshot(123L, 123L);
    +				} catch (Throwable e) {
    +					runnableError.f0 = e;
    +				} finally {
    +					snapshotReturnedLatch.trigger();
    +				}
    +			}
    +		});
    +		snapshotThread.start();
    +
    +		// being extra safe that the snapshot is correctly blocked
    +		try {
    +			snapshotReturnedLatch.await(3, TimeUnit.SECONDS);
    --- End diff --
    
    I've removed the latch completely. You're right, the latch was unnecessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3278: [FLINK-5701] [kafka] FlinkKafkaProducer should che...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3278#discussion_r101971157
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java ---
    @@ -88,201 +85,321 @@ public void testKeyValueDeserializersSetIfMissing() throws Exception {
     	@Test
     	public void testPartitionerOpenedWithDeterminatePartitionList() throws Exception {
     		KafkaPartitioner mockPartitioner = mock(KafkaPartitioner.class);
    +
     		RuntimeContext mockRuntimeContext = mock(RuntimeContext.class);
     		when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
     		when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
    -
    -		DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(
    +		
    +		// out-of-order list of 4 partitions
    +		List<PartitionInfo> mockPartitionsList = new ArrayList<>(4);
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 3, null, null, null));
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 1, null, null, null));
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 0, null, null, null));
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 2, null, null, null));
    +
    +		final DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(
     			FakeStandardProducerConfig.get(), mockPartitioner);
     		producer.setRuntimeContext(mockRuntimeContext);
     
    +		final KafkaProducer mockProducer = producer.getMockKafkaProducer();
    +		when(mockProducer.partitionsFor(anyString())).thenReturn(mockPartitionsList);
    +		when(mockProducer.metrics()).thenReturn(null);
    +
     		producer.open(new Configuration());
     
    -		// the internal mock KafkaProducer will return an out-of-order list of 4 partitions,
    -		// which should be sorted before provided to the custom partitioner's open() method
    +		// the out-of-order partitions list should be sorted before provided to the custom partitioner's open() method
     		int[] correctPartitionList = {0, 1, 2, 3};
     		verify(mockPartitioner).open(0, 1, correctPartitionList);
     	}
     
     	/**
    -	 * Test ensuring that the producer is not dropping buffered records.;
    -	 * we set a timeout because the test will not finish if the logic is broken
    +	 * Test ensuring that if an invoke call happens right after an async exception is caught, it should be rethrown
     	 */
    -	@Test(timeout=5000)
    -	public void testAtLeastOnceProducer() throws Throwable {
    -		runAtLeastOnceTest(true);
    +	@Test
    +	public void testAsyncErrorRethrownOnInvoke() throws Throwable {
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null);
    +
    +		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
    +
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +		// let the message request return an async exception
    +		producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception"));
    +
    +		try {
    +			testHarness.processElement(new StreamRecord<>("msg-2"));
    +		} catch (Exception e) {
    +			// the next invoke should rethrow the async exception
    +			Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception"));
    +
    +			// test succeeded
    +			return;
    +		}
    +
    +		Assert.fail();
     	}
     
     	/**
    -	 * Ensures that the at least once producing test fails if the flushing is disabled
    +	 * Test ensuring that if a snapshot call happens right after an async exception is caught, it should be rethrown
     	 */
    -	@Test(expected = AssertionError.class, timeout=5000)
    -	public void testAtLeastOnceProducerFailsIfFlushingDisabled() throws Throwable {
    -		runAtLeastOnceTest(false);
    -	}
    -
    -	private void runAtLeastOnceTest(boolean flushOnCheckpoint) throws Throwable {
    -		final AtomicBoolean snapshottingFinished = new AtomicBoolean(false);
    +	@Test
    +	public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
     		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    -			FakeStandardProducerConfig.get(), null, snapshottingFinished);
    -		producer.setFlushOnCheckpoint(flushOnCheckpoint);
    +			FakeStandardProducerConfig.get(), null);
     
     		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    -				new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
     
     		testHarness.open();
     
    -		for (int i = 0; i < 100; i++) {
    -			testHarness.processElement(new StreamRecord<>("msg-" + i));
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +		// let the message request return an async exception
    +		producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception"));
    +
    +		try {
    +			testHarness.snapshot(123L, 123L);
    +		} catch (Exception e) {
    +			// the next invoke should rethrow the async exception
    +			Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception"));
    +
    +			// test succeeded
    +			return;
     		}
     
    -		// start a thread confirming all pending records
    -		final Tuple1<Throwable> runnableError = new Tuple1<>(null);
    -		final Thread threadA = Thread.currentThread();
    +		Assert.fail();
    +	}
    +
    +	/**
    +	 * Test ensuring that if an async exception is caught for one of the flushed requests on checkpoint,
    +	 * it should be rethrown; we set a timeout because the test will not finish if the logic is broken.
    +	 *
    +	 * Note that this test does not test the snapshot method is blocked correctly when there are pending recorrds.
    +	 * The test for that is covered in testAtLeastOnceProducer.
    +	 */
    +	@SuppressWarnings("unchecked")
    +	@Test(timeout=5000)
    +	public void testAsyncErrorRethrownOnCheckpointAfterFlush() throws Throwable {
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null);
    +		producer.setFlushOnCheckpoint(true);
    +
    +		final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer();
    +
    +		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
    +
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +		testHarness.processElement(new StreamRecord<>("msg-2"));
    +		testHarness.processElement(new StreamRecord<>("msg-3"));
    +
    +		verify(mockProducer, times(3)).send(any(ProducerRecord.class), any(Callback.class));
    +
    +		// only let the first callback succeed for now
    +		producer.getPendingCallbacks().get(0).onCompletion(null, null);
     
    -		Runnable confirmer = new Runnable() {
    +		CheckedThread snapshotThread = new CheckedThread() {
     			@Override
    -			public void run() {
    -				try {
    -					MockProducer mp = producer.getProducerInstance();
    -					List<Callback> pending = mp.getPending();
    -
    -					// we need to find out if the snapshot() method blocks forever
    -					// this is not possible. If snapshot() is running, it will
    -					// start removing elements from the pending list.
    -					synchronized (threadA) {
    -						threadA.wait(500L);
    -					}
    -					// we now check that no records have been confirmed yet
    -					Assert.assertEquals(100, pending.size());
    -					Assert.assertFalse("Snapshot method returned before all records were confirmed",
    -						snapshottingFinished.get());
    -
    -					// now confirm all checkpoints
    -					for (Callback c: pending) {
    -						c.onCompletion(null, null);
    -					}
    -					pending.clear();
    -				} catch(Throwable t) {
    -					runnableError.f0 = t;
    -				}
    +			public void go() throws Exception {
    +				// this should block at first, since there are still two pending records that needs to be flushed
    +				testHarness.snapshot(123L, 123L);
     			}
     		};
    -		Thread threadB = new Thread(confirmer);
    -		threadB.start();
    +		snapshotThread.start();
     
    -		// this should block:
    -		testHarness.snapshot(0, 0);
    +		// let the 2nd message fail with an async exception
    +		producer.getPendingCallbacks().get(1).onCompletion(null, new Exception("artificial async failure for 2nd message"));
    +		producer.getPendingCallbacks().get(2).onCompletion(null, null);
     
    -		synchronized (threadA) {
    -			threadA.notifyAll(); // just in case, to let the test fail faster
    -		}
    -		Assert.assertEquals(0, producer.getProducerInstance().getPending().size());
    -		Deadline deadline = FiniteDuration.apply(5, "s").fromNow();
    -		while (deadline.hasTimeLeft() && threadB.isAlive()) {
    -			threadB.join(500);
    -		}
    -		Assert.assertFalse("Thread A is expected to be finished at this point. If not, the test is prone to fail", threadB.isAlive());
    -		if (runnableError.f0 != null) {
    -			throw runnableError.f0;
    +		try {
    +			snapshotThread.sync();
    +		} catch (Exception e) {
    +			// the snapshot should have failed with the async exception
    +			Assert.assertTrue(e.getCause().getMessage().contains("artificial async failure for 2nd message"));
    +
    +			// test succeeded
    +			return;
     		}
     
    +		Assert.fail();
    +	}
    +
    +	/**
    +	 * Test ensuring that the producer is not dropping buffered records;
    +	 * we set a timeout because the test will not finish if the logic is broken
    +	 */
    +	@SuppressWarnings("unchecked")
    +	@Test(timeout=10000)
    +	public void testAtLeastOnceProducer() throws Throwable {
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null);
    +		producer.setFlushOnCheckpoint(true);
    +
    +		final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer();
    +
    +		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
    +
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +		testHarness.processElement(new StreamRecord<>("msg-2"));
    +		testHarness.processElement(new StreamRecord<>("msg-3"));
    +
    +		verify(mockProducer, times(3)).send(any(ProducerRecord.class), any(Callback.class));
    +		Assert.assertEquals(3, producer.getPendingSize());
    +
    +		// start a thread to perform checkpointing
    +		CheckedThread snapshotThread = new CheckedThread() {
    +			@Override
    +			public void go() throws Exception {
    +				// this should block until all records are flushed;
    +				// if the snapshot implementation returns before pending records are flushed,
    +				testHarness.snapshot(123L, 123L);
    +			}
    +		};
    +		snapshotThread.start();
    +
    +		// being extra safe that the snapshot is correctly blocked;
    +		// after some arbitrary time, the snapshot should still be blocked
    +		snapshotThread.join(3000);
    --- End diff --
    
    Blocking for 3 seconds is not good. It's basically a `sleep` what you're doing here. Better to wait on a latch which is triggered when you enter the flush method, for example.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3278: [FLINK-5701] [kafka] FlinkKafkaProducer should check asyn...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3278
  
    @tillrohrmann thanks for the comment. I'll try again and see if I can come with up a proper test for `testAtLeastOnceProdcuerFailsIfFlushingDisabled()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3278: [FLINK-5701] [kafka] FlinkKafkaProducer should check asyn...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3278
  
    @tillrohrmann 
    Yes, in the condition that you described, then at-least-once doesn't hold. I said _might_ mainly considering there is chance that for every checkpoint barrier, the previous events (i.e. `event1` in your example) has been committed. But yes, essentially this indeterminate behaviour means that it is not at-least-once, so I'm incorrect in saying that it _might_.
    
    I was trying to point out that this indeterminate behaviour made it hard to have a stable test for `testAtLeastOnceProducerFailsIfFlushingDisabled()` without relying on sleeping, which ultimately leads to flaky tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3278: [FLINK-5701] [kafka] FlinkKafkaProducer should check asyn...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3278
  
    @tillrohrmann I've hopefully addressed your comments. Could you please have a look?
    
    The major change was to refactor the `DummyFlinkKafkaProducer` so that a simple mock `KafkaProducer` can be provided for tests where a simple `verify(mockProducer).send(...)` is sufficient. The refactored version also has the advantage that it makes sure the `Callback` implementation in the connector is correct (correctly decrements `pendingRecords` value).
    
    Note: the only thing I could not really improve is the `testAtLeastOnceProducer` test. Like before, the refactored version is still essentially continuously checking whether the snapshot method has returned early. The test is definitely stable, just that I don't think there is a better approach for it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3278: [FLINK-5701] [kafka] FlinkKafkaProducer should che...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3278#discussion_r100035659
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java ---
    @@ -293,6 +293,61 @@ public void run() {
     		testHarness.close();
     	}
     
    +	/**
    +	 * This test is meant to assure that testAtLeastOnceProducer is valid by testing that if flushing is disabled,
    +	 * the snapshot method does indeed finishes without waiting for pending records;
    +	 * we set a timeout because the test will not finish if the logic is broken
    +	 */
    +	@Test(timeout=5000)
    +	public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws Throwable {
    +		final OneShotLatch inputLatch = new OneShotLatch();
    +
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, inputLatch, 100, new AtomicBoolean(false));
    +		producer.setFlushOnCheckpoint(false);
    +
    +		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +
    +		testHarness.open();
    +
    +		List<Callback> pending = producer.getProducerInstance().getPending();
    +
    +		for (int i = 0; i < 100; i++) {
    +			testHarness.processElement(new StreamRecord<>("msg-" + i));
    +		}
    +
    +		inputLatch.await();
    --- End diff --
    
    Correct, I'll change this. The other tests might actually not need also, I'll check them as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3278: [FLINK-5701] [kafka] FlinkKafkaProducer should che...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3278#discussion_r101273261
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java ---
    @@ -88,195 +87,296 @@ public void testKeyValueDeserializersSetIfMissing() throws Exception {
     	@Test
     	public void testPartitionerOpenedWithDeterminatePartitionList() throws Exception {
     		KafkaPartitioner mockPartitioner = mock(KafkaPartitioner.class);
    +
     		RuntimeContext mockRuntimeContext = mock(RuntimeContext.class);
     		when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
     		when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
    +		
    +		// out-of-order list of 4 partitions
    +		List<PartitionInfo> mockPartitionsList = new ArrayList<>(4);
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 3, null, null, null));
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 1, null, null, null));
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 0, null, null, null));
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 2, null, null, null));
    +		
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		when(mockProducer.partitionsFor(anyString())).thenReturn(mockPartitionsList);
    +		when(mockProducer.metrics()).thenReturn(null);
     
     		DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(
    -			FakeStandardProducerConfig.get(), mockPartitioner);
    +			FakeStandardProducerConfig.get(), mockPartitioner, mockProducer);
     		producer.setRuntimeContext(mockRuntimeContext);
     
     		producer.open(new Configuration());
     
    -		// the internal mock KafkaProducer will return an out-of-order list of 4 partitions,
    -		// which should be sorted before provided to the custom partitioner's open() method
    +		// the out-of-order partitions list should be sorted before provided to the custom partitioner's open() method
     		int[] correctPartitionList = {0, 1, 2, 3};
     		verify(mockPartitioner).open(0, 1, correctPartitionList);
     	}
     
     	/**
    -	 * Test ensuring that the producer is not dropping buffered records.;
    -	 * we set a timeout because the test will not finish if the logic is broken
    +	 * Test ensuring that if an invoke call happens right after an async exception is caught, it should be rethrown
     	 */
    -	@Test(timeout=5000)
    -	public void testAtLeastOnceProducer() throws Throwable {
    -		runAtLeastOnceTest(true);
    +	@Test
    +	public void testAsyncErrorRethrownOnInvoke() throws Throwable {
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, mockProducer);
    +
    +		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +		// let the message request return an async exception
    +		producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception"));
    +
    +		try {
    +			testHarness.processElement(new StreamRecord<>("msg-2"));
    +		} catch (Exception e) {
    +			// the next invoke should rethrow the async exception
    +			Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception"));
    +			return;
    +		}
    +
    +		Assert.fail();
     	}
     
     	/**
    -	 * Ensures that the at least once producing test fails if the flushing is disabled
    +	 * Test ensuring that if a snapshot call happens right after an async exception is caught, it should be rethrown
     	 */
    -	@Test(expected = AssertionError.class, timeout=5000)
    -	public void testAtLeastOnceProducerFailsIfFlushingDisabled() throws Throwable {
    -		runAtLeastOnceTest(false);
    -	}
    -
    -	private void runAtLeastOnceTest(boolean flushOnCheckpoint) throws Throwable {
    -		final AtomicBoolean snapshottingFinished = new AtomicBoolean(false);
    +	@Test
    +	public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
     		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    -			FakeStandardProducerConfig.get(), null, snapshottingFinished);
    -		producer.setFlushOnCheckpoint(flushOnCheckpoint);
    +			FakeStandardProducerConfig.get(), null, mockProducer);
     
     		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    -				new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
     
     		testHarness.open();
     
    -		for (int i = 0; i < 100; i++) {
    -			testHarness.processElement(new StreamRecord<>("msg-" + i));
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +		// let the message request return an async exception
    +		producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception"));
    +
    +		try {
    +			testHarness.snapshot(123L, 123L);
    +		} catch (Exception e) {
    +			// the next invoke should rethrow the async exception
    +			Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception"));
    +			return;
     		}
     
    -		// start a thread confirming all pending records
    -		final Tuple1<Throwable> runnableError = new Tuple1<>(null);
    -		final Thread threadA = Thread.currentThread();
    +		Assert.fail();
    +	}
     
    -		Runnable confirmer = new Runnable() {
    +	/**
    +	 * Test ensuring that if an async exception is caught for one of the flushed requests on checkpoint,
    +	 * it should be rethrown; we set a timeout because the test will not finish if the logic is broken.
    +	 *
    +	 * Note that this test does not test the snapshot method is blocked correctly when there are pending recorrds.
    +	 * The test for that is covered in testAtLeastOnceProducer.
    +	 */
    +	@Test(timeout=5000)
    +	public void testAsyncErrorRethrownOnCheckpointAfterFlush() throws Throwable {
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, mockProducer);
    +		producer.setFlushOnCheckpoint(true);
    +
    +		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +		testHarness.processElement(new StreamRecord<>("msg-2"));
    +		testHarness.processElement(new StreamRecord<>("msg-3"));
    +
    +		verify(mockProducer, times(3)).send(any(ProducerRecord.class), any(Callback.class));
    +
    +		// only let the first callback succeed for now
    +		producer.getPendingCallbacks().get(0).onCompletion(null, null);
    +
    +		final Tuple1<Throwable> asyncError = new Tuple1<>(null);
    +		Thread snapshotThread = new Thread(new Runnable() {
    --- End diff --
    
    Thanks for this tip! Will change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3278: [FLINK-5701] [kafka] FlinkKafkaProducer should che...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3278#discussion_r100695926
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java ---
    @@ -88,195 +87,296 @@ public void testKeyValueDeserializersSetIfMissing() throws Exception {
     	@Test
     	public void testPartitionerOpenedWithDeterminatePartitionList() throws Exception {
     		KafkaPartitioner mockPartitioner = mock(KafkaPartitioner.class);
    +
     		RuntimeContext mockRuntimeContext = mock(RuntimeContext.class);
     		when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
     		when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
    +		
    +		// out-of-order list of 4 partitions
    +		List<PartitionInfo> mockPartitionsList = new ArrayList<>(4);
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 3, null, null, null));
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 1, null, null, null));
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 0, null, null, null));
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 2, null, null, null));
    +		
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		when(mockProducer.partitionsFor(anyString())).thenReturn(mockPartitionsList);
    +		when(mockProducer.metrics()).thenReturn(null);
     
     		DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(
    -			FakeStandardProducerConfig.get(), mockPartitioner);
    +			FakeStandardProducerConfig.get(), mockPartitioner, mockProducer);
     		producer.setRuntimeContext(mockRuntimeContext);
     
     		producer.open(new Configuration());
     
    -		// the internal mock KafkaProducer will return an out-of-order list of 4 partitions,
    -		// which should be sorted before provided to the custom partitioner's open() method
    +		// the out-of-order partitions list should be sorted before provided to the custom partitioner's open() method
     		int[] correctPartitionList = {0, 1, 2, 3};
     		verify(mockPartitioner).open(0, 1, correctPartitionList);
     	}
     
     	/**
    -	 * Test ensuring that the producer is not dropping buffered records.;
    -	 * we set a timeout because the test will not finish if the logic is broken
    +	 * Test ensuring that if an invoke call happens right after an async exception is caught, it should be rethrown
     	 */
    -	@Test(timeout=5000)
    -	public void testAtLeastOnceProducer() throws Throwable {
    -		runAtLeastOnceTest(true);
    +	@Test
    +	public void testAsyncErrorRethrownOnInvoke() throws Throwable {
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, mockProducer);
    +
    +		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +		// let the message request return an async exception
    +		producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception"));
    +
    +		try {
    +			testHarness.processElement(new StreamRecord<>("msg-2"));
    +		} catch (Exception e) {
    +			// the next invoke should rethrow the async exception
    +			Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception"));
    +			return;
    +		}
    +
    +		Assert.fail();
     	}
     
     	/**
    -	 * Ensures that the at least once producing test fails if the flushing is disabled
    +	 * Test ensuring that if a snapshot call happens right after an async exception is caught, it should be rethrown
     	 */
    -	@Test(expected = AssertionError.class, timeout=5000)
    -	public void testAtLeastOnceProducerFailsIfFlushingDisabled() throws Throwable {
    -		runAtLeastOnceTest(false);
    -	}
    -
    -	private void runAtLeastOnceTest(boolean flushOnCheckpoint) throws Throwable {
    -		final AtomicBoolean snapshottingFinished = new AtomicBoolean(false);
    +	@Test
    +	public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
     		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    -			FakeStandardProducerConfig.get(), null, snapshottingFinished);
    -		producer.setFlushOnCheckpoint(flushOnCheckpoint);
    +			FakeStandardProducerConfig.get(), null, mockProducer);
     
     		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    -				new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
     
     		testHarness.open();
     
    -		for (int i = 0; i < 100; i++) {
    -			testHarness.processElement(new StreamRecord<>("msg-" + i));
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +		// let the message request return an async exception
    +		producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception"));
    +
    +		try {
    +			testHarness.snapshot(123L, 123L);
    +		} catch (Exception e) {
    +			// the next invoke should rethrow the async exception
    +			Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception"));
    +			return;
     		}
     
    -		// start a thread confirming all pending records
    -		final Tuple1<Throwable> runnableError = new Tuple1<>(null);
    -		final Thread threadA = Thread.currentThread();
    +		Assert.fail();
    +	}
     
    -		Runnable confirmer = new Runnable() {
    +	/**
    +	 * Test ensuring that if an async exception is caught for one of the flushed requests on checkpoint,
    +	 * it should be rethrown; we set a timeout because the test will not finish if the logic is broken.
    +	 *
    +	 * Note that this test does not test the snapshot method is blocked correctly when there are pending recorrds.
    +	 * The test for that is covered in testAtLeastOnceProducer.
    +	 */
    +	@Test(timeout=5000)
    +	public void testAsyncErrorRethrownOnCheckpointAfterFlush() throws Throwable {
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, mockProducer);
    +		producer.setFlushOnCheckpoint(true);
    +
    +		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +		testHarness.processElement(new StreamRecord<>("msg-2"));
    +		testHarness.processElement(new StreamRecord<>("msg-3"));
    +
    +		verify(mockProducer, times(3)).send(any(ProducerRecord.class), any(Callback.class));
    +
    +		// only let the first callback succeed for now
    +		producer.getPendingCallbacks().get(0).onCompletion(null, null);
    +
    +		final Tuple1<Throwable> asyncError = new Tuple1<>(null);
    +		Thread snapshotThread = new Thread(new Runnable() {
     			@Override
     			public void run() {
     				try {
    -					MockProducer mp = producer.getProducerInstance();
    -					List<Callback> pending = mp.getPending();
    -
    -					// we need to find out if the snapshot() method blocks forever
    -					// this is not possible. If snapshot() is running, it will
    -					// start removing elements from the pending list.
    -					synchronized (threadA) {
    -						threadA.wait(500L);
    -					}
    -					// we now check that no records have been confirmed yet
    -					Assert.assertEquals(100, pending.size());
    -					Assert.assertFalse("Snapshot method returned before all records were confirmed",
    -						snapshottingFinished.get());
    -
    -					// now confirm all checkpoints
    -					for (Callback c: pending) {
    -						c.onCompletion(null, null);
    -					}
    -					pending.clear();
    -				} catch(Throwable t) {
    -					runnableError.f0 = t;
    +					// this should block at first, since there are still two pending records that needs to be flushed
    +					testHarness.snapshot(123L, 123L);
    +				} catch (Exception e) {
    +					asyncError.f0 = e;
     				}
     			}
    -		};
    -		Thread threadB = new Thread(confirmer);
    -		threadB.start();
    +		});
    +		snapshotThread.start();
     
    -		// this should block:
    -		testHarness.snapshot(0, 0);
    +		// let the 2nd message fail with an async exception
    +		producer.getPendingCallbacks().get(1).onCompletion(null, new Exception("artificial async failure for 2nd message"));
    +		producer.getPendingCallbacks().get(2).onCompletion(null, null);
     
    -		synchronized (threadA) {
    -			threadA.notifyAll(); // just in case, to let the test fail faster
    -		}
    -		Assert.assertEquals(0, producer.getProducerInstance().getPending().size());
    -		Deadline deadline = FiniteDuration.apply(5, "s").fromNow();
    -		while (deadline.hasTimeLeft() && threadB.isAlive()) {
    -			threadB.join(500);
    +		snapshotThread.join();
    +
    +		// the snapshot should have failed with the async exception
    +		Assert.assertTrue(asyncError.f0 != null && asyncError.f0.getCause().getMessage().contains("artificial async failure for 2nd message"));
    +	}
    +
    +	/**
    +	 * Test ensuring that the producer is not dropping buffered records;
    +	 * we set a timeout because the test will not finish if the logic is broken
    +	 */
    +	@Test(timeout=10000)
    +	public void testAtLeastOnceProducer() throws Throwable {
    +
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, mockProducer);
    +		producer.setFlushOnCheckpoint(true);
    +
    +		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +		testHarness.processElement(new StreamRecord<>("msg-2"));
    +		testHarness.processElement(new StreamRecord<>("msg-3"));
    +
    +		verify(mockProducer, times(3)).send(any(ProducerRecord.class), any(Callback.class));
    +		Assert.assertEquals(3, producer.getPendingSize());
    +
    +		// start a thread to perform checkpointing
    +		final Tuple1<Throwable> runnableError = new Tuple1<>(null);
    +		final OneShotLatch snapshotReturnedLatch = new OneShotLatch();
    +
    +		Thread snapshotThread = new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				try {
    +					// this should block until all records are flushed
    +					testHarness.snapshot(123L, 123L);
    +				} catch (Throwable e) {
    +					runnableError.f0 = e;
    +				} finally {
    +					snapshotReturnedLatch.trigger();
    +				}
    +			}
    +		});
    +		snapshotThread.start();
    +
    +		// being extra safe that the snapshot is correctly blocked
    +		try {
    +			snapshotReturnedLatch.await(3, TimeUnit.SECONDS);
    --- End diff --
    
    The same could be done via `Thread.join(3, Seconds)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3278: [FLINK-5701] [kafka] FlinkKafkaProducer should che...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3278#discussion_r100036593
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java ---
    @@ -293,6 +293,61 @@ public void run() {
     		testHarness.close();
     	}
     
    +	/**
    +	 * This test is meant to assure that testAtLeastOnceProducer is valid by testing that if flushing is disabled,
    +	 * the snapshot method does indeed finishes without waiting for pending records;
    +	 * we set a timeout because the test will not finish if the logic is broken
    +	 */
    +	@Test(timeout=5000)
    +	public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws Throwable {
    +		final OneShotLatch inputLatch = new OneShotLatch();
    +
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    --- End diff --
    
    That makes sense. I was just reusing the {{MockProducer}} implementation in this class, which is needed in other tests to be able to provide success / failure fake callback completions. In this test we don't need to do that, so a simple {{mock(KafkaProducer.class)}} should do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3278: [FLINK-5701] [kafka] FlinkKafkaProducer should check asyn...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/3278
  
    Ah now I understand what the problem of `testAtLeastOnceProdcuerFailsIfFlushingDisabled` was. Can't we mock the `KafkaProducer` to control when the record's callbacks are triggered? That way we have full control over the execution and can write a proper test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3278: [FLINK-5701] [kafka] FlinkKafkaProducer should che...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3278


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3278: [FLINK-5701] [kafka] FlinkKafkaProducer should check asyn...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3278
  
    Thanks for the detailed review @tillrohrmann, I'll follow-up and address your comments.
    
    Regarding removing the `setFlushOnCheckpoint`:
    I think it was added at first to provide flexibility for users who know what they are doing, and making sure that the producer will be able to work in all environments (see comments in #2108).
    
    However, recently I've also gathered opinions (from you and others) about the settings over complicating at-least-once guarantees for the producer, and I have the feeling we can remove it starting from the next release.
    
    There is FLINK-5728 to enable flushing by default (currently the default is no flushing). I'll incorporate your opinion on this to that JIRA, and decide there if we only want to disable if by default or remove it completely.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3278: [FLINK-5701] [kafka] FlinkKafkaProducer should check asyn...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3278
  
    Tests have passed, failed ones are Travis timeouts. Merging this now ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3278: [FLINK-5701] [kafka] FlinkKafkaProducer should check asyn...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/3278
  
    I don't think that you have at least once guarantees if you disable flushing. Assume the following: You have the input `event1, checkpoint barrier, event2`. Now you write `event1` to Kafka but it is not yet committed. Now you process the checkpoint barrier and complete the checkpoint without waiting for `event1` to be written. Now writing `event1` to Kafka fails for some reason and triggers a recovery. Then you will start at `event2` without having ever written `event1` to Kafka.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3278: [FLINK-5701] [kafka] FlinkKafkaProducer should che...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3278#discussion_r101275048
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java ---
    @@ -88,195 +87,296 @@ public void testKeyValueDeserializersSetIfMissing() throws Exception {
     	@Test
     	public void testPartitionerOpenedWithDeterminatePartitionList() throws Exception {
     		KafkaPartitioner mockPartitioner = mock(KafkaPartitioner.class);
    +
     		RuntimeContext mockRuntimeContext = mock(RuntimeContext.class);
     		when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
     		when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
    +		
    +		// out-of-order list of 4 partitions
    +		List<PartitionInfo> mockPartitionsList = new ArrayList<>(4);
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 3, null, null, null));
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 1, null, null, null));
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 0, null, null, null));
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 2, null, null, null));
    +		
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		when(mockProducer.partitionsFor(anyString())).thenReturn(mockPartitionsList);
    +		when(mockProducer.metrics()).thenReturn(null);
     
     		DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(
    -			FakeStandardProducerConfig.get(), mockPartitioner);
    +			FakeStandardProducerConfig.get(), mockPartitioner, mockProducer);
     		producer.setRuntimeContext(mockRuntimeContext);
     
     		producer.open(new Configuration());
     
    -		// the internal mock KafkaProducer will return an out-of-order list of 4 partitions,
    -		// which should be sorted before provided to the custom partitioner's open() method
    +		// the out-of-order partitions list should be sorted before provided to the custom partitioner's open() method
     		int[] correctPartitionList = {0, 1, 2, 3};
     		verify(mockPartitioner).open(0, 1, correctPartitionList);
     	}
     
     	/**
    -	 * Test ensuring that the producer is not dropping buffered records.;
    -	 * we set a timeout because the test will not finish if the logic is broken
    +	 * Test ensuring that if an invoke call happens right after an async exception is caught, it should be rethrown
     	 */
    -	@Test(timeout=5000)
    -	public void testAtLeastOnceProducer() throws Throwable {
    -		runAtLeastOnceTest(true);
    +	@Test
    +	public void testAsyncErrorRethrownOnInvoke() throws Throwable {
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, mockProducer);
    +
    +		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +		// let the message request return an async exception
    +		producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception"));
    +
    +		try {
    +			testHarness.processElement(new StreamRecord<>("msg-2"));
    +		} catch (Exception e) {
    +			// the next invoke should rethrow the async exception
    +			Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception"));
    +			return;
    +		}
    +
    +		Assert.fail();
     	}
     
     	/**
    -	 * Ensures that the at least once producing test fails if the flushing is disabled
    +	 * Test ensuring that if a snapshot call happens right after an async exception is caught, it should be rethrown
     	 */
    -	@Test(expected = AssertionError.class, timeout=5000)
    -	public void testAtLeastOnceProducerFailsIfFlushingDisabled() throws Throwable {
    -		runAtLeastOnceTest(false);
    -	}
    -
    -	private void runAtLeastOnceTest(boolean flushOnCheckpoint) throws Throwable {
    -		final AtomicBoolean snapshottingFinished = new AtomicBoolean(false);
    +	@Test
    +	public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
     		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    -			FakeStandardProducerConfig.get(), null, snapshottingFinished);
    -		producer.setFlushOnCheckpoint(flushOnCheckpoint);
    +			FakeStandardProducerConfig.get(), null, mockProducer);
     
     		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    -				new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
     
     		testHarness.open();
     
    -		for (int i = 0; i < 100; i++) {
    -			testHarness.processElement(new StreamRecord<>("msg-" + i));
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +		// let the message request return an async exception
    +		producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception"));
    +
    +		try {
    +			testHarness.snapshot(123L, 123L);
    +		} catch (Exception e) {
    +			// the next invoke should rethrow the async exception
    +			Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception"));
    +			return;
     		}
     
    -		// start a thread confirming all pending records
    -		final Tuple1<Throwable> runnableError = new Tuple1<>(null);
    -		final Thread threadA = Thread.currentThread();
    +		Assert.fail();
    +	}
     
    -		Runnable confirmer = new Runnable() {
    +	/**
    +	 * Test ensuring that if an async exception is caught for one of the flushed requests on checkpoint,
    +	 * it should be rethrown; we set a timeout because the test will not finish if the logic is broken.
    +	 *
    +	 * Note that this test does not test the snapshot method is blocked correctly when there are pending recorrds.
    +	 * The test for that is covered in testAtLeastOnceProducer.
    +	 */
    +	@Test(timeout=5000)
    +	public void testAsyncErrorRethrownOnCheckpointAfterFlush() throws Throwable {
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, mockProducer);
    +		producer.setFlushOnCheckpoint(true);
    +
    +		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +		testHarness.processElement(new StreamRecord<>("msg-2"));
    +		testHarness.processElement(new StreamRecord<>("msg-3"));
    +
    +		verify(mockProducer, times(3)).send(any(ProducerRecord.class), any(Callback.class));
    +
    +		// only let the first callback succeed for now
    +		producer.getPendingCallbacks().get(0).onCompletion(null, null);
    +
    +		final Tuple1<Throwable> asyncError = new Tuple1<>(null);
    +		Thread snapshotThread = new Thread(new Runnable() {
     			@Override
     			public void run() {
     				try {
    -					MockProducer mp = producer.getProducerInstance();
    -					List<Callback> pending = mp.getPending();
    -
    -					// we need to find out if the snapshot() method blocks forever
    -					// this is not possible. If snapshot() is running, it will
    -					// start removing elements from the pending list.
    -					synchronized (threadA) {
    -						threadA.wait(500L);
    -					}
    -					// we now check that no records have been confirmed yet
    -					Assert.assertEquals(100, pending.size());
    -					Assert.assertFalse("Snapshot method returned before all records were confirmed",
    -						snapshottingFinished.get());
    -
    -					// now confirm all checkpoints
    -					for (Callback c: pending) {
    -						c.onCompletion(null, null);
    -					}
    -					pending.clear();
    -				} catch(Throwable t) {
    -					runnableError.f0 = t;
    +					// this should block at first, since there are still two pending records that needs to be flushed
    +					testHarness.snapshot(123L, 123L);
    +				} catch (Exception e) {
    +					asyncError.f0 = e;
     				}
     			}
    -		};
    -		Thread threadB = new Thread(confirmer);
    -		threadB.start();
    +		});
    +		snapshotThread.start();
     
    -		// this should block:
    -		testHarness.snapshot(0, 0);
    +		// let the 2nd message fail with an async exception
    +		producer.getPendingCallbacks().get(1).onCompletion(null, new Exception("artificial async failure for 2nd message"));
    +		producer.getPendingCallbacks().get(2).onCompletion(null, null);
     
    -		synchronized (threadA) {
    -			threadA.notifyAll(); // just in case, to let the test fail faster
    -		}
    -		Assert.assertEquals(0, producer.getProducerInstance().getPending().size());
    -		Deadline deadline = FiniteDuration.apply(5, "s").fromNow();
    -		while (deadline.hasTimeLeft() && threadB.isAlive()) {
    -			threadB.join(500);
    +		snapshotThread.join();
    +
    +		// the snapshot should have failed with the async exception
    +		Assert.assertTrue(asyncError.f0 != null && asyncError.f0.getCause().getMessage().contains("artificial async failure for 2nd message"));
    +	}
    +
    +	/**
    +	 * Test ensuring that the producer is not dropping buffered records;
    +	 * we set a timeout because the test will not finish if the logic is broken
    +	 */
    +	@Test(timeout=10000)
    +	public void testAtLeastOnceProducer() throws Throwable {
    +
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, mockProducer);
    +		producer.setFlushOnCheckpoint(true);
    +
    +		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +		testHarness.processElement(new StreamRecord<>("msg-2"));
    +		testHarness.processElement(new StreamRecord<>("msg-3"));
    +
    +		verify(mockProducer, times(3)).send(any(ProducerRecord.class), any(Callback.class));
    +		Assert.assertEquals(3, producer.getPendingSize());
    +
    +		// start a thread to perform checkpointing
    +		final Tuple1<Throwable> runnableError = new Tuple1<>(null);
    +		final OneShotLatch snapshotReturnedLatch = new OneShotLatch();
    +
    +		Thread snapshotThread = new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				try {
    +					// this should block until all records are flushed
    +					testHarness.snapshot(123L, 123L);
    +				} catch (Throwable e) {
    +					runnableError.f0 = e;
    +				} finally {
    +					snapshotReturnedLatch.trigger();
    +				}
    +			}
    +		});
    +		snapshotThread.start();
    +
    +		// being extra safe that the snapshot is correctly blocked
    +		try {
    +			snapshotReturnedLatch.await(3, TimeUnit.SECONDS);
    +		} catch (TimeoutException expected) {
    +			//
     		}
    -		Assert.assertFalse("Thread A is expected to be finished at this point. If not, the test is prone to fail", threadB.isAlive());
    +		Assert.assertTrue("Snapshot returned before all records were flushed", !snapshotReturnedLatch.isTriggered());
    +
    +		producer.getPendingCallbacks().get(0).onCompletion(null, null);
    +		Assert.assertTrue("Snapshot returned before all records were flushed", !snapshotReturnedLatch.isTriggered());
    +		Assert.assertEquals(2, producer.getPendingSize());
    +
    +		producer.getPendingCallbacks().get(1).onCompletion(null, null);
    +		Assert.assertTrue("Snapshot returned before all records were flushed", !snapshotReturnedLatch.isTriggered());
    +		Assert.assertEquals(1, producer.getPendingSize());
    +
    +		producer.getPendingCallbacks().get(2).onCompletion(null, null);
    +		Assert.assertEquals(0, producer.getPendingSize());
    +
    +		snapshotReturnedLatch.await();
    +		snapshotThread.join();
    +
     		if (runnableError.f0 != null) {
     			throw runnableError.f0;
     		}
     
     		testHarness.close();
     	}
     
    +	/**
    +	 * This test is meant to assure that testAtLeastOnceProducer is valid by testing that if flushing is disabled,
    +	 * the snapshot method does indeed finishes without waiting for pending records;
    +	 * we set a timeout because the test will not finish if the logic is broken
    +	 */
    +	@Test(timeout=5000)
    +	public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws Throwable {
     
    -	// ------------------------------------------------------------------------
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, mockProducer);
    +		producer.setFlushOnCheckpoint(false);
     
    -	private static class DummyFlinkKafkaProducer<T> extends FlinkKafkaProducerBase<T> {
    -		private static final long serialVersionUID = 1L;
    +		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
     
    -		private transient MockProducer prod;
    -		private AtomicBoolean snapshottingFinished;
    +		testHarness.open();
     
    -		@SuppressWarnings("unchecked")
    -		public DummyFlinkKafkaProducer(Properties producerConfig, KafkaPartitioner partitioner, AtomicBoolean snapshottingFinished) {
    -			super("dummy-topic", (KeyedSerializationSchema< T >) mock(KeyedSerializationSchema.class), producerConfig, partitioner);
    -			this.snapshottingFinished = snapshottingFinished;
    -		}
    +		testHarness.processElement(new StreamRecord<>("msg"));
     
    -		// constructor variant for test irrelated to snapshotting
    -		@SuppressWarnings("unchecked")
    -		public DummyFlinkKafkaProducer(Properties producerConfig, KafkaPartitioner partitioner) {
    -			super("dummy-topic", (KeyedSerializationSchema< T >) mock(KeyedSerializationSchema.class), producerConfig, partitioner);
    -			this.snapshottingFinished = new AtomicBoolean(true);
    -		}
    +		// make sure that all callbacks have not been completed
    +		verify(mockProducer, times(1)).send(any(ProducerRecord.class), any(Callback.class));
     
    -		@Override
    -		protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties props) {
    -			this.prod = new MockProducer();
    -			return this.prod;
    -		}
    +		// should return even if there are pending records
    +		testHarness.snapshot(123L, 123L);
     
    -		@Override
    -		public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
    -			// call the actual snapshot state
    -			super.snapshotState(ctx);
    -			// notify test that snapshotting has been done
    -			snapshottingFinished.set(true);
    -		}
    +		testHarness.close();
    +	}
     
    -		@Override
    -		protected void flush() {
    -			this.prod.flush();
    -		}
    +	// ------------------------------------------------------------------------
     
    -		public MockProducer getProducerInstance() {
    -			return this.prod;
    -		}
    -	}
    +	private static class DummyFlinkKafkaProducer<T> extends FlinkKafkaProducerBase<T> {
    +		private static final long serialVersionUID = 1L;
    +		
    +		private final static String DUMMY_TOPIC = "dummy-topic";
     
    -	private static class MockProducer<K, V> extends KafkaProducer<K, V> {
    -		List<Callback> pendingCallbacks = new ArrayList<>();
    +		private final KafkaProducer mockProducer;
    --- End diff --
    
    Will change to `transient`.
    
    Curious, though: is this just for documentation purposes for readers of the code?
    
    Otherwise, we need the `mockProducer` throughout the whole life cycle of the object (I tried instantiating a mocked `KafkaProducer` in `open()` like what we would normally do for the non-serializable fields, but it won't work because of how the `testPartitionerOpenedWithDeterminatePartitionList` needs to mock more method calls before calling `open()`). Also in the tests we're not serializing the instances anyway.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3278: [FLINK-5701] [kafka] FlinkKafkaProducer should check asyn...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3278
  
    Instead of directly re-adding `testAtLeastOnceProducerFailsIfFlushingDisabled `, I instead added a test `testDoesNotWaitForPendingRecordsIfFlushingDisabled` to simply assure that the snapshot method returns even if there are pending records.
    
    The purpose of the `testDoesNotWaitForPendingRecordsIfFlushingDisabled` test is to assure that the actual `testAtLeastOnceProducer` test is valid.
    I think this was what the original `testAtLeastOnceProducerFailsIfFlushingDisabled` was actually meant for anyway.
    
    @tillrohrmann please let me know if this makes sense to you :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3278: [FLINK-5701] [kafka] FlinkKafkaProducer should che...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3278#discussion_r100030887
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java ---
    @@ -293,6 +293,61 @@ public void run() {
     		testHarness.close();
     	}
     
    +	/**
    +	 * This test is meant to assure that testAtLeastOnceProducer is valid by testing that if flushing is disabled,
    +	 * the snapshot method does indeed finishes without waiting for pending records;
    +	 * we set a timeout because the test will not finish if the logic is broken
    +	 */
    +	@Test(timeout=5000)
    +	public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws Throwable {
    +		final OneShotLatch inputLatch = new OneShotLatch();
    +
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    --- End diff --
    
    Can't we simply give this instance a mock `KafkaProducer`? Then we could easily check via `verify` whether the `send` method has been called.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3278: [FLINK-5701] [kafka] FlinkKafkaProducer should che...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3278#discussion_r100695724
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java ---
    @@ -88,195 +87,296 @@ public void testKeyValueDeserializersSetIfMissing() throws Exception {
     	@Test
     	public void testPartitionerOpenedWithDeterminatePartitionList() throws Exception {
     		KafkaPartitioner mockPartitioner = mock(KafkaPartitioner.class);
    +
     		RuntimeContext mockRuntimeContext = mock(RuntimeContext.class);
     		when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
     		when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
    +		
    +		// out-of-order list of 4 partitions
    +		List<PartitionInfo> mockPartitionsList = new ArrayList<>(4);
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 3, null, null, null));
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 1, null, null, null));
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 0, null, null, null));
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 2, null, null, null));
    +		
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		when(mockProducer.partitionsFor(anyString())).thenReturn(mockPartitionsList);
    +		when(mockProducer.metrics()).thenReturn(null);
     
     		DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(
    -			FakeStandardProducerConfig.get(), mockPartitioner);
    +			FakeStandardProducerConfig.get(), mockPartitioner, mockProducer);
     		producer.setRuntimeContext(mockRuntimeContext);
     
     		producer.open(new Configuration());
     
    -		// the internal mock KafkaProducer will return an out-of-order list of 4 partitions,
    -		// which should be sorted before provided to the custom partitioner's open() method
    +		// the out-of-order partitions list should be sorted before provided to the custom partitioner's open() method
     		int[] correctPartitionList = {0, 1, 2, 3};
     		verify(mockPartitioner).open(0, 1, correctPartitionList);
     	}
     
     	/**
    -	 * Test ensuring that the producer is not dropping buffered records.;
    -	 * we set a timeout because the test will not finish if the logic is broken
    +	 * Test ensuring that if an invoke call happens right after an async exception is caught, it should be rethrown
     	 */
    -	@Test(timeout=5000)
    -	public void testAtLeastOnceProducer() throws Throwable {
    -		runAtLeastOnceTest(true);
    +	@Test
    +	public void testAsyncErrorRethrownOnInvoke() throws Throwable {
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, mockProducer);
    +
    +		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +		// let the message request return an async exception
    +		producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception"));
    +
    +		try {
    +			testHarness.processElement(new StreamRecord<>("msg-2"));
    +		} catch (Exception e) {
    +			// the next invoke should rethrow the async exception
    +			Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception"));
    +			return;
    +		}
    +
    +		Assert.fail();
     	}
     
     	/**
    -	 * Ensures that the at least once producing test fails if the flushing is disabled
    +	 * Test ensuring that if a snapshot call happens right after an async exception is caught, it should be rethrown
     	 */
    -	@Test(expected = AssertionError.class, timeout=5000)
    -	public void testAtLeastOnceProducerFailsIfFlushingDisabled() throws Throwable {
    -		runAtLeastOnceTest(false);
    -	}
    -
    -	private void runAtLeastOnceTest(boolean flushOnCheckpoint) throws Throwable {
    -		final AtomicBoolean snapshottingFinished = new AtomicBoolean(false);
    +	@Test
    +	public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
     		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    -			FakeStandardProducerConfig.get(), null, snapshottingFinished);
    -		producer.setFlushOnCheckpoint(flushOnCheckpoint);
    +			FakeStandardProducerConfig.get(), null, mockProducer);
     
     		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    -				new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
     
     		testHarness.open();
     
    -		for (int i = 0; i < 100; i++) {
    -			testHarness.processElement(new StreamRecord<>("msg-" + i));
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +		// let the message request return an async exception
    +		producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception"));
    +
    +		try {
    +			testHarness.snapshot(123L, 123L);
    +		} catch (Exception e) {
    +			// the next invoke should rethrow the async exception
    +			Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception"));
    +			return;
     		}
     
    -		// start a thread confirming all pending records
    -		final Tuple1<Throwable> runnableError = new Tuple1<>(null);
    -		final Thread threadA = Thread.currentThread();
    +		Assert.fail();
    +	}
     
    -		Runnable confirmer = new Runnable() {
    +	/**
    +	 * Test ensuring that if an async exception is caught for one of the flushed requests on checkpoint,
    +	 * it should be rethrown; we set a timeout because the test will not finish if the logic is broken.
    +	 *
    +	 * Note that this test does not test the snapshot method is blocked correctly when there are pending recorrds.
    +	 * The test for that is covered in testAtLeastOnceProducer.
    +	 */
    +	@Test(timeout=5000)
    +	public void testAsyncErrorRethrownOnCheckpointAfterFlush() throws Throwable {
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, mockProducer);
    +		producer.setFlushOnCheckpoint(true);
    +
    +		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +		testHarness.processElement(new StreamRecord<>("msg-2"));
    +		testHarness.processElement(new StreamRecord<>("msg-3"));
    +
    +		verify(mockProducer, times(3)).send(any(ProducerRecord.class), any(Callback.class));
    +
    +		// only let the first callback succeed for now
    +		producer.getPendingCallbacks().get(0).onCompletion(null, null);
    +
    +		final Tuple1<Throwable> asyncError = new Tuple1<>(null);
    +		Thread snapshotThread = new Thread(new Runnable() {
     			@Override
     			public void run() {
     				try {
    -					MockProducer mp = producer.getProducerInstance();
    -					List<Callback> pending = mp.getPending();
    -
    -					// we need to find out if the snapshot() method blocks forever
    -					// this is not possible. If snapshot() is running, it will
    -					// start removing elements from the pending list.
    -					synchronized (threadA) {
    -						threadA.wait(500L);
    -					}
    -					// we now check that no records have been confirmed yet
    -					Assert.assertEquals(100, pending.size());
    -					Assert.assertFalse("Snapshot method returned before all records were confirmed",
    -						snapshottingFinished.get());
    -
    -					// now confirm all checkpoints
    -					for (Callback c: pending) {
    -						c.onCompletion(null, null);
    -					}
    -					pending.clear();
    -				} catch(Throwable t) {
    -					runnableError.f0 = t;
    +					// this should block at first, since there are still two pending records that needs to be flushed
    +					testHarness.snapshot(123L, 123L);
    +				} catch (Exception e) {
    +					asyncError.f0 = e;
     				}
     			}
    -		};
    -		Thread threadB = new Thread(confirmer);
    -		threadB.start();
    +		});
    +		snapshotThread.start();
     
    -		// this should block:
    -		testHarness.snapshot(0, 0);
    +		// let the 2nd message fail with an async exception
    +		producer.getPendingCallbacks().get(1).onCompletion(null, new Exception("artificial async failure for 2nd message"));
    +		producer.getPendingCallbacks().get(2).onCompletion(null, null);
     
    -		synchronized (threadA) {
    -			threadA.notifyAll(); // just in case, to let the test fail faster
    -		}
    -		Assert.assertEquals(0, producer.getProducerInstance().getPending().size());
    -		Deadline deadline = FiniteDuration.apply(5, "s").fromNow();
    -		while (deadline.hasTimeLeft() && threadB.isAlive()) {
    -			threadB.join(500);
    +		snapshotThread.join();
    +
    +		// the snapshot should have failed with the async exception
    +		Assert.assertTrue(asyncError.f0 != null && asyncError.f0.getCause().getMessage().contains("artificial async failure for 2nd message"));
    +	}
    +
    +	/**
    +	 * Test ensuring that the producer is not dropping buffered records;
    +	 * we set a timeout because the test will not finish if the logic is broken
    +	 */
    +	@Test(timeout=10000)
    +	public void testAtLeastOnceProducer() throws Throwable {
    +
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, mockProducer);
    +		producer.setFlushOnCheckpoint(true);
    +
    +		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +		testHarness.processElement(new StreamRecord<>("msg-2"));
    +		testHarness.processElement(new StreamRecord<>("msg-3"));
    +
    +		verify(mockProducer, times(3)).send(any(ProducerRecord.class), any(Callback.class));
    +		Assert.assertEquals(3, producer.getPendingSize());
    +
    +		// start a thread to perform checkpointing
    +		final Tuple1<Throwable> runnableError = new Tuple1<>(null);
    +		final OneShotLatch snapshotReturnedLatch = new OneShotLatch();
    +
    +		Thread snapshotThread = new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				try {
    +					// this should block until all records are flushed
    +					testHarness.snapshot(123L, 123L);
    +				} catch (Throwable e) {
    +					runnableError.f0 = e;
    +				} finally {
    +					snapshotReturnedLatch.trigger();
    +				}
    +			}
    +		});
    +		snapshotThread.start();
    +
    +		// being extra safe that the snapshot is correctly blocked
    +		try {
    +			snapshotReturnedLatch.await(3, TimeUnit.SECONDS);
    +		} catch (TimeoutException expected) {
    +			//
     		}
    -		Assert.assertFalse("Thread A is expected to be finished at this point. If not, the test is prone to fail", threadB.isAlive());
    +		Assert.assertTrue("Snapshot returned before all records were flushed", !snapshotReturnedLatch.isTriggered());
    +
    +		producer.getPendingCallbacks().get(0).onCompletion(null, null);
    +		Assert.assertTrue("Snapshot returned before all records were flushed", !snapshotReturnedLatch.isTriggered());
    +		Assert.assertEquals(2, producer.getPendingSize());
    +
    +		producer.getPendingCallbacks().get(1).onCompletion(null, null);
    +		Assert.assertTrue("Snapshot returned before all records were flushed", !snapshotReturnedLatch.isTriggered());
    +		Assert.assertEquals(1, producer.getPendingSize());
    +
    +		producer.getPendingCallbacks().get(2).onCompletion(null, null);
    +		Assert.assertEquals(0, producer.getPendingSize());
    +
    +		snapshotReturnedLatch.await();
    +		snapshotThread.join();
    +
     		if (runnableError.f0 != null) {
     			throw runnableError.f0;
     		}
     
     		testHarness.close();
     	}
     
    +	/**
    +	 * This test is meant to assure that testAtLeastOnceProducer is valid by testing that if flushing is disabled,
    +	 * the snapshot method does indeed finishes without waiting for pending records;
    +	 * we set a timeout because the test will not finish if the logic is broken
    +	 */
    +	@Test(timeout=5000)
    +	public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws Throwable {
     
    -	// ------------------------------------------------------------------------
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, mockProducer);
    +		producer.setFlushOnCheckpoint(false);
     
    -	private static class DummyFlinkKafkaProducer<T> extends FlinkKafkaProducerBase<T> {
    -		private static final long serialVersionUID = 1L;
    +		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
     
    -		private transient MockProducer prod;
    -		private AtomicBoolean snapshottingFinished;
    +		testHarness.open();
     
    -		@SuppressWarnings("unchecked")
    -		public DummyFlinkKafkaProducer(Properties producerConfig, KafkaPartitioner partitioner, AtomicBoolean snapshottingFinished) {
    -			super("dummy-topic", (KeyedSerializationSchema< T >) mock(KeyedSerializationSchema.class), producerConfig, partitioner);
    -			this.snapshottingFinished = snapshottingFinished;
    -		}
    +		testHarness.processElement(new StreamRecord<>("msg"));
     
    -		// constructor variant for test irrelated to snapshotting
    -		@SuppressWarnings("unchecked")
    -		public DummyFlinkKafkaProducer(Properties producerConfig, KafkaPartitioner partitioner) {
    -			super("dummy-topic", (KeyedSerializationSchema< T >) mock(KeyedSerializationSchema.class), producerConfig, partitioner);
    -			this.snapshottingFinished = new AtomicBoolean(true);
    -		}
    +		// make sure that all callbacks have not been completed
    +		verify(mockProducer, times(1)).send(any(ProducerRecord.class), any(Callback.class));
     
    -		@Override
    -		protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties props) {
    -			this.prod = new MockProducer();
    -			return this.prod;
    -		}
    +		// should return even if there are pending records
    +		testHarness.snapshot(123L, 123L);
     
    -		@Override
    -		public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
    -			// call the actual snapshot state
    -			super.snapshotState(ctx);
    -			// notify test that snapshotting has been done
    -			snapshottingFinished.set(true);
    -		}
    +		testHarness.close();
    +	}
     
    -		@Override
    -		protected void flush() {
    -			this.prod.flush();
    -		}
    +	// ------------------------------------------------------------------------
     
    -		public MockProducer getProducerInstance() {
    -			return this.prod;
    -		}
    -	}
    +	private static class DummyFlinkKafkaProducer<T> extends FlinkKafkaProducerBase<T> {
    +		private static final long serialVersionUID = 1L;
    +		
    +		private final static String DUMMY_TOPIC = "dummy-topic";
     
    -	private static class MockProducer<K, V> extends KafkaProducer<K, V> {
    -		List<Callback> pendingCallbacks = new ArrayList<>();
    +		private final KafkaProducer mockProducer;
    --- End diff --
    
    Raw usage of Kafka producer


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3278: [FLINK-5701] [kafka] FlinkKafkaProducer should check asyn...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/3278
  
    I think we should allow users to disable the wait on flush, because it can substantially delay the confirmation of a checkpoint.
    If a user favors fast checkpoints over complete data in Kafka (for example when a particular producer instance is used mostly for debugging purposes only), we should allow them to do that. The overhead for us making this configurable is very low, but the benefit for some users might be huge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3278: [FLINK-5701] [kafka] FlinkKafkaProducer should che...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3278#discussion_r100030377
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java ---
    @@ -293,6 +293,61 @@ public void run() {
     		testHarness.close();
     	}
     
    +	/**
    +	 * This test is meant to assure that testAtLeastOnceProducer is valid by testing that if flushing is disabled,
    +	 * the snapshot method does indeed finishes without waiting for pending records;
    +	 * we set a timeout because the test will not finish if the logic is broken
    +	 */
    +	@Test(timeout=5000)
    +	public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws Throwable {
    +		final OneShotLatch inputLatch = new OneShotLatch();
    +
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, inputLatch, 100, new AtomicBoolean(false));
    +		producer.setFlushOnCheckpoint(false);
    +
    +		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +
    +		testHarness.open();
    +
    +		List<Callback> pending = producer.getProducerInstance().getPending();
    +
    +		for (int i = 0; i < 100; i++) {
    +			testHarness.processElement(new StreamRecord<>("msg-" + i));
    +		}
    +
    +		inputLatch.await();
    +
    +		// make sure that all callbacks have not been completed
    +		Assert.assertEquals(100, pending.size());
    +
    +		// use a separate thread to continuously monitor whether snapshotting has returned
    +		final Tuple1<Throwable> runnableError = new Tuple1<>(null);
    +		Thread snapshotThread = new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				try {
    +					testHarness.snapshot(123L, 123L);
    +				} catch (Exception e) {
    +					runnableError.f0 = e;
    +				}
    +			}
    +		});
    --- End diff --
    
    Why this additional `Thread`? I think calling directly `testHarness.snapshot` is perfectly fine. Whether you block on this call or on the `Thread.join` does not make a difference.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3278: [FLINK-5701] [kafka] FlinkKafkaProducer should che...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3278#discussion_r100482362
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java ---
    @@ -293,6 +293,61 @@ public void run() {
     		testHarness.close();
     	}
     
    +	/**
    +	 * This test is meant to assure that testAtLeastOnceProducer is valid by testing that if flushing is disabled,
    +	 * the snapshot method does indeed finishes without waiting for pending records;
    +	 * we set a timeout because the test will not finish if the logic is broken
    +	 */
    +	@Test(timeout=5000)
    +	public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws Throwable {
    +		final OneShotLatch inputLatch = new OneShotLatch();
    +
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    --- End diff --
    
    Turns out that we must have an extension of `KafkaProducer` provided to `FlinkKafkaProducerBase`. The reason is that the producer base requires implementing an abstract `flush` method, which doesn't exist on the `KafkaProducer`'s interface. So it'll be more straightforward to extend `KafkaProducer` and implement a `flush` method ourselves, and let `FlinkKafkaProducerBase` call that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3278: [FLINK-5701] [kafka] FlinkKafkaProducer should che...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3278#discussion_r100695937
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java ---
    @@ -88,195 +87,296 @@ public void testKeyValueDeserializersSetIfMissing() throws Exception {
     	@Test
     	public void testPartitionerOpenedWithDeterminatePartitionList() throws Exception {
     		KafkaPartitioner mockPartitioner = mock(KafkaPartitioner.class);
    +
     		RuntimeContext mockRuntimeContext = mock(RuntimeContext.class);
     		when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
     		when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
    +		
    +		// out-of-order list of 4 partitions
    +		List<PartitionInfo> mockPartitionsList = new ArrayList<>(4);
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 3, null, null, null));
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 1, null, null, null));
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 0, null, null, null));
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 2, null, null, null));
    +		
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		when(mockProducer.partitionsFor(anyString())).thenReturn(mockPartitionsList);
    +		when(mockProducer.metrics()).thenReturn(null);
     
     		DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(
    -			FakeStandardProducerConfig.get(), mockPartitioner);
    +			FakeStandardProducerConfig.get(), mockPartitioner, mockProducer);
     		producer.setRuntimeContext(mockRuntimeContext);
     
     		producer.open(new Configuration());
     
    -		// the internal mock KafkaProducer will return an out-of-order list of 4 partitions,
    -		// which should be sorted before provided to the custom partitioner's open() method
    +		// the out-of-order partitions list should be sorted before provided to the custom partitioner's open() method
     		int[] correctPartitionList = {0, 1, 2, 3};
     		verify(mockPartitioner).open(0, 1, correctPartitionList);
     	}
     
     	/**
    -	 * Test ensuring that the producer is not dropping buffered records.;
    -	 * we set a timeout because the test will not finish if the logic is broken
    +	 * Test ensuring that if an invoke call happens right after an async exception is caught, it should be rethrown
     	 */
    -	@Test(timeout=5000)
    -	public void testAtLeastOnceProducer() throws Throwable {
    -		runAtLeastOnceTest(true);
    +	@Test
    +	public void testAsyncErrorRethrownOnInvoke() throws Throwable {
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, mockProducer);
    +
    +		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +		// let the message request return an async exception
    +		producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception"));
    +
    +		try {
    +			testHarness.processElement(new StreamRecord<>("msg-2"));
    +		} catch (Exception e) {
    +			// the next invoke should rethrow the async exception
    +			Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception"));
    +			return;
    +		}
    +
    +		Assert.fail();
     	}
     
     	/**
    -	 * Ensures that the at least once producing test fails if the flushing is disabled
    +	 * Test ensuring that if a snapshot call happens right after an async exception is caught, it should be rethrown
     	 */
    -	@Test(expected = AssertionError.class, timeout=5000)
    -	public void testAtLeastOnceProducerFailsIfFlushingDisabled() throws Throwable {
    -		runAtLeastOnceTest(false);
    -	}
    -
    -	private void runAtLeastOnceTest(boolean flushOnCheckpoint) throws Throwable {
    -		final AtomicBoolean snapshottingFinished = new AtomicBoolean(false);
    +	@Test
    +	public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
     		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    -			FakeStandardProducerConfig.get(), null, snapshottingFinished);
    -		producer.setFlushOnCheckpoint(flushOnCheckpoint);
    +			FakeStandardProducerConfig.get(), null, mockProducer);
     
     		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    -				new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
     
     		testHarness.open();
     
    -		for (int i = 0; i < 100; i++) {
    -			testHarness.processElement(new StreamRecord<>("msg-" + i));
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +		// let the message request return an async exception
    +		producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception"));
    +
    +		try {
    +			testHarness.snapshot(123L, 123L);
    +		} catch (Exception e) {
    +			// the next invoke should rethrow the async exception
    +			Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception"));
    +			return;
     		}
     
    -		// start a thread confirming all pending records
    -		final Tuple1<Throwable> runnableError = new Tuple1<>(null);
    -		final Thread threadA = Thread.currentThread();
    +		Assert.fail();
    +	}
     
    -		Runnable confirmer = new Runnable() {
    +	/**
    +	 * Test ensuring that if an async exception is caught for one of the flushed requests on checkpoint,
    +	 * it should be rethrown; we set a timeout because the test will not finish if the logic is broken.
    +	 *
    +	 * Note that this test does not test the snapshot method is blocked correctly when there are pending recorrds.
    +	 * The test for that is covered in testAtLeastOnceProducer.
    +	 */
    +	@Test(timeout=5000)
    +	public void testAsyncErrorRethrownOnCheckpointAfterFlush() throws Throwable {
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, mockProducer);
    +		producer.setFlushOnCheckpoint(true);
    +
    +		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +		testHarness.processElement(new StreamRecord<>("msg-2"));
    +		testHarness.processElement(new StreamRecord<>("msg-3"));
    +
    +		verify(mockProducer, times(3)).send(any(ProducerRecord.class), any(Callback.class));
    +
    +		// only let the first callback succeed for now
    +		producer.getPendingCallbacks().get(0).onCompletion(null, null);
    +
    +		final Tuple1<Throwable> asyncError = new Tuple1<>(null);
    +		Thread snapshotThread = new Thread(new Runnable() {
     			@Override
     			public void run() {
     				try {
    -					MockProducer mp = producer.getProducerInstance();
    -					List<Callback> pending = mp.getPending();
    -
    -					// we need to find out if the snapshot() method blocks forever
    -					// this is not possible. If snapshot() is running, it will
    -					// start removing elements from the pending list.
    -					synchronized (threadA) {
    -						threadA.wait(500L);
    -					}
    -					// we now check that no records have been confirmed yet
    -					Assert.assertEquals(100, pending.size());
    -					Assert.assertFalse("Snapshot method returned before all records were confirmed",
    -						snapshottingFinished.get());
    -
    -					// now confirm all checkpoints
    -					for (Callback c: pending) {
    -						c.onCompletion(null, null);
    -					}
    -					pending.clear();
    -				} catch(Throwable t) {
    -					runnableError.f0 = t;
    +					// this should block at first, since there are still two pending records that needs to be flushed
    +					testHarness.snapshot(123L, 123L);
    +				} catch (Exception e) {
    +					asyncError.f0 = e;
     				}
     			}
    -		};
    -		Thread threadB = new Thread(confirmer);
    -		threadB.start();
    +		});
    +		snapshotThread.start();
     
    -		// this should block:
    -		testHarness.snapshot(0, 0);
    +		// let the 2nd message fail with an async exception
    +		producer.getPendingCallbacks().get(1).onCompletion(null, new Exception("artificial async failure for 2nd message"));
    +		producer.getPendingCallbacks().get(2).onCompletion(null, null);
     
    -		synchronized (threadA) {
    -			threadA.notifyAll(); // just in case, to let the test fail faster
    -		}
    -		Assert.assertEquals(0, producer.getProducerInstance().getPending().size());
    -		Deadline deadline = FiniteDuration.apply(5, "s").fromNow();
    -		while (deadline.hasTimeLeft() && threadB.isAlive()) {
    -			threadB.join(500);
    +		snapshotThread.join();
    +
    +		// the snapshot should have failed with the async exception
    +		Assert.assertTrue(asyncError.f0 != null && asyncError.f0.getCause().getMessage().contains("artificial async failure for 2nd message"));
    +	}
    +
    +	/**
    +	 * Test ensuring that the producer is not dropping buffered records;
    +	 * we set a timeout because the test will not finish if the logic is broken
    +	 */
    +	@Test(timeout=10000)
    +	public void testAtLeastOnceProducer() throws Throwable {
    +
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, mockProducer);
    +		producer.setFlushOnCheckpoint(true);
    +
    +		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +		testHarness.processElement(new StreamRecord<>("msg-2"));
    +		testHarness.processElement(new StreamRecord<>("msg-3"));
    +
    +		verify(mockProducer, times(3)).send(any(ProducerRecord.class), any(Callback.class));
    +		Assert.assertEquals(3, producer.getPendingSize());
    +
    +		// start a thread to perform checkpointing
    +		final Tuple1<Throwable> runnableError = new Tuple1<>(null);
    +		final OneShotLatch snapshotReturnedLatch = new OneShotLatch();
    +
    +		Thread snapshotThread = new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				try {
    +					// this should block until all records are flushed
    +					testHarness.snapshot(123L, 123L);
    +				} catch (Throwable e) {
    +					runnableError.f0 = e;
    +				} finally {
    +					snapshotReturnedLatch.trigger();
    +				}
    +			}
    +		});
    +		snapshotThread.start();
    +
    +		// being extra safe that the snapshot is correctly blocked
    +		try {
    +			snapshotReturnedLatch.await(3, TimeUnit.SECONDS);
    +		} catch (TimeoutException expected) {
    +			//
     		}
    -		Assert.assertFalse("Thread A is expected to be finished at this point. If not, the test is prone to fail", threadB.isAlive());
    +		Assert.assertTrue("Snapshot returned before all records were flushed", !snapshotReturnedLatch.isTriggered());
    +
    +		producer.getPendingCallbacks().get(0).onCompletion(null, null);
    +		Assert.assertTrue("Snapshot returned before all records were flushed", !snapshotReturnedLatch.isTriggered());
    +		Assert.assertEquals(2, producer.getPendingSize());
    +
    +		producer.getPendingCallbacks().get(1).onCompletion(null, null);
    +		Assert.assertTrue("Snapshot returned before all records were flushed", !snapshotReturnedLatch.isTriggered());
    +		Assert.assertEquals(1, producer.getPendingSize());
    +
    +		producer.getPendingCallbacks().get(2).onCompletion(null, null);
    +		Assert.assertEquals(0, producer.getPendingSize());
    +
    +		snapshotReturnedLatch.await();
    +		snapshotThread.join();
    --- End diff --
    
    I think the latch is redundant.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3278: [FLINK-5701] [kafka] FlinkKafkaProducer should check asyn...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3278
  
    Addressed waiting for flushing to start with a latch instead of sleeping.
    Will merge this to `master` and `release-1.2` once Travis turns green (running also locally due to the timeouts).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3278: [FLINK-5701] [kafka] FlinkKafkaProducer should che...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3278#discussion_r100695748
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java ---
    @@ -88,195 +87,296 @@ public void testKeyValueDeserializersSetIfMissing() throws Exception {
     	@Test
     	public void testPartitionerOpenedWithDeterminatePartitionList() throws Exception {
     		KafkaPartitioner mockPartitioner = mock(KafkaPartitioner.class);
    +
     		RuntimeContext mockRuntimeContext = mock(RuntimeContext.class);
     		when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
     		when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
    +		
    +		// out-of-order list of 4 partitions
    +		List<PartitionInfo> mockPartitionsList = new ArrayList<>(4);
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 3, null, null, null));
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 1, null, null, null));
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 0, null, null, null));
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 2, null, null, null));
    +		
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		when(mockProducer.partitionsFor(anyString())).thenReturn(mockPartitionsList);
    +		when(mockProducer.metrics()).thenReturn(null);
     
     		DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(
    -			FakeStandardProducerConfig.get(), mockPartitioner);
    +			FakeStandardProducerConfig.get(), mockPartitioner, mockProducer);
     		producer.setRuntimeContext(mockRuntimeContext);
     
     		producer.open(new Configuration());
     
    -		// the internal mock KafkaProducer will return an out-of-order list of 4 partitions,
    -		// which should be sorted before provided to the custom partitioner's open() method
    +		// the out-of-order partitions list should be sorted before provided to the custom partitioner's open() method
     		int[] correctPartitionList = {0, 1, 2, 3};
     		verify(mockPartitioner).open(0, 1, correctPartitionList);
     	}
     
     	/**
    -	 * Test ensuring that the producer is not dropping buffered records.;
    -	 * we set a timeout because the test will not finish if the logic is broken
    +	 * Test ensuring that if an invoke call happens right after an async exception is caught, it should be rethrown
     	 */
    -	@Test(timeout=5000)
    -	public void testAtLeastOnceProducer() throws Throwable {
    -		runAtLeastOnceTest(true);
    +	@Test
    +	public void testAsyncErrorRethrownOnInvoke() throws Throwable {
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, mockProducer);
    +
    +		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +		// let the message request return an async exception
    +		producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception"));
    +
    +		try {
    +			testHarness.processElement(new StreamRecord<>("msg-2"));
    +		} catch (Exception e) {
    +			// the next invoke should rethrow the async exception
    +			Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception"));
    +			return;
    +		}
    +
    +		Assert.fail();
     	}
     
     	/**
    -	 * Ensures that the at least once producing test fails if the flushing is disabled
    +	 * Test ensuring that if a snapshot call happens right after an async exception is caught, it should be rethrown
     	 */
    -	@Test(expected = AssertionError.class, timeout=5000)
    -	public void testAtLeastOnceProducerFailsIfFlushingDisabled() throws Throwable {
    -		runAtLeastOnceTest(false);
    -	}
    -
    -	private void runAtLeastOnceTest(boolean flushOnCheckpoint) throws Throwable {
    -		final AtomicBoolean snapshottingFinished = new AtomicBoolean(false);
    +	@Test
    +	public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
     		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    -			FakeStandardProducerConfig.get(), null, snapshottingFinished);
    -		producer.setFlushOnCheckpoint(flushOnCheckpoint);
    +			FakeStandardProducerConfig.get(), null, mockProducer);
     
     		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    -				new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
     
     		testHarness.open();
     
    -		for (int i = 0; i < 100; i++) {
    -			testHarness.processElement(new StreamRecord<>("msg-" + i));
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +		// let the message request return an async exception
    +		producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception"));
    +
    +		try {
    +			testHarness.snapshot(123L, 123L);
    +		} catch (Exception e) {
    +			// the next invoke should rethrow the async exception
    +			Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception"));
    +			return;
     		}
     
    -		// start a thread confirming all pending records
    -		final Tuple1<Throwable> runnableError = new Tuple1<>(null);
    -		final Thread threadA = Thread.currentThread();
    +		Assert.fail();
    +	}
     
    -		Runnable confirmer = new Runnable() {
    +	/**
    +	 * Test ensuring that if an async exception is caught for one of the flushed requests on checkpoint,
    +	 * it should be rethrown; we set a timeout because the test will not finish if the logic is broken.
    +	 *
    +	 * Note that this test does not test the snapshot method is blocked correctly when there are pending recorrds.
    +	 * The test for that is covered in testAtLeastOnceProducer.
    +	 */
    +	@Test(timeout=5000)
    +	public void testAsyncErrorRethrownOnCheckpointAfterFlush() throws Throwable {
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, mockProducer);
    +		producer.setFlushOnCheckpoint(true);
    +
    +		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +		testHarness.processElement(new StreamRecord<>("msg-2"));
    +		testHarness.processElement(new StreamRecord<>("msg-3"));
    +
    +		verify(mockProducer, times(3)).send(any(ProducerRecord.class), any(Callback.class));
    +
    +		// only let the first callback succeed for now
    +		producer.getPendingCallbacks().get(0).onCompletion(null, null);
    +
    +		final Tuple1<Throwable> asyncError = new Tuple1<>(null);
    +		Thread snapshotThread = new Thread(new Runnable() {
     			@Override
     			public void run() {
     				try {
    -					MockProducer mp = producer.getProducerInstance();
    -					List<Callback> pending = mp.getPending();
    -
    -					// we need to find out if the snapshot() method blocks forever
    -					// this is not possible. If snapshot() is running, it will
    -					// start removing elements from the pending list.
    -					synchronized (threadA) {
    -						threadA.wait(500L);
    -					}
    -					// we now check that no records have been confirmed yet
    -					Assert.assertEquals(100, pending.size());
    -					Assert.assertFalse("Snapshot method returned before all records were confirmed",
    -						snapshottingFinished.get());
    -
    -					// now confirm all checkpoints
    -					for (Callback c: pending) {
    -						c.onCompletion(null, null);
    -					}
    -					pending.clear();
    -				} catch(Throwable t) {
    -					runnableError.f0 = t;
    +					// this should block at first, since there are still two pending records that needs to be flushed
    +					testHarness.snapshot(123L, 123L);
    +				} catch (Exception e) {
    +					asyncError.f0 = e;
     				}
     			}
    -		};
    -		Thread threadB = new Thread(confirmer);
    -		threadB.start();
    +		});
    +		snapshotThread.start();
     
    -		// this should block:
    -		testHarness.snapshot(0, 0);
    +		// let the 2nd message fail with an async exception
    +		producer.getPendingCallbacks().get(1).onCompletion(null, new Exception("artificial async failure for 2nd message"));
    +		producer.getPendingCallbacks().get(2).onCompletion(null, null);
     
    -		synchronized (threadA) {
    -			threadA.notifyAll(); // just in case, to let the test fail faster
    -		}
    -		Assert.assertEquals(0, producer.getProducerInstance().getPending().size());
    -		Deadline deadline = FiniteDuration.apply(5, "s").fromNow();
    -		while (deadline.hasTimeLeft() && threadB.isAlive()) {
    -			threadB.join(500);
    +		snapshotThread.join();
    +
    +		// the snapshot should have failed with the async exception
    +		Assert.assertTrue(asyncError.f0 != null && asyncError.f0.getCause().getMessage().contains("artificial async failure for 2nd message"));
    +	}
    +
    +	/**
    +	 * Test ensuring that the producer is not dropping buffered records;
    +	 * we set a timeout because the test will not finish if the logic is broken
    +	 */
    +	@Test(timeout=10000)
    +	public void testAtLeastOnceProducer() throws Throwable {
    +
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, mockProducer);
    +		producer.setFlushOnCheckpoint(true);
    +
    +		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +		testHarness.processElement(new StreamRecord<>("msg-2"));
    +		testHarness.processElement(new StreamRecord<>("msg-3"));
    +
    +		verify(mockProducer, times(3)).send(any(ProducerRecord.class), any(Callback.class));
    +		Assert.assertEquals(3, producer.getPendingSize());
    +
    +		// start a thread to perform checkpointing
    +		final Tuple1<Throwable> runnableError = new Tuple1<>(null);
    +		final OneShotLatch snapshotReturnedLatch = new OneShotLatch();
    +
    +		Thread snapshotThread = new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				try {
    +					// this should block until all records are flushed
    +					testHarness.snapshot(123L, 123L);
    +				} catch (Throwable e) {
    +					runnableError.f0 = e;
    +				} finally {
    +					snapshotReturnedLatch.trigger();
    +				}
    +			}
    +		});
    +		snapshotThread.start();
    +
    +		// being extra safe that the snapshot is correctly blocked
    +		try {
    +			snapshotReturnedLatch.await(3, TimeUnit.SECONDS);
    +		} catch (TimeoutException expected) {
    +			//
     		}
    -		Assert.assertFalse("Thread A is expected to be finished at this point. If not, the test is prone to fail", threadB.isAlive());
    +		Assert.assertTrue("Snapshot returned before all records were flushed", !snapshotReturnedLatch.isTriggered());
    +
    +		producer.getPendingCallbacks().get(0).onCompletion(null, null);
    +		Assert.assertTrue("Snapshot returned before all records were flushed", !snapshotReturnedLatch.isTriggered());
    +		Assert.assertEquals(2, producer.getPendingSize());
    +
    +		producer.getPendingCallbacks().get(1).onCompletion(null, null);
    +		Assert.assertTrue("Snapshot returned before all records were flushed", !snapshotReturnedLatch.isTriggered());
    +		Assert.assertEquals(1, producer.getPendingSize());
    +
    +		producer.getPendingCallbacks().get(2).onCompletion(null, null);
    +		Assert.assertEquals(0, producer.getPendingSize());
    +
    +		snapshotReturnedLatch.await();
    +		snapshotThread.join();
    +
     		if (runnableError.f0 != null) {
     			throw runnableError.f0;
     		}
     
     		testHarness.close();
     	}
     
    +	/**
    +	 * This test is meant to assure that testAtLeastOnceProducer is valid by testing that if flushing is disabled,
    +	 * the snapshot method does indeed finishes without waiting for pending records;
    +	 * we set a timeout because the test will not finish if the logic is broken
    +	 */
    +	@Test(timeout=5000)
    +	public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws Throwable {
     
    -	// ------------------------------------------------------------------------
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, mockProducer);
    +		producer.setFlushOnCheckpoint(false);
     
    -	private static class DummyFlinkKafkaProducer<T> extends FlinkKafkaProducerBase<T> {
    -		private static final long serialVersionUID = 1L;
    +		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
     
    -		private transient MockProducer prod;
    -		private AtomicBoolean snapshottingFinished;
    +		testHarness.open();
     
    -		@SuppressWarnings("unchecked")
    -		public DummyFlinkKafkaProducer(Properties producerConfig, KafkaPartitioner partitioner, AtomicBoolean snapshottingFinished) {
    -			super("dummy-topic", (KeyedSerializationSchema< T >) mock(KeyedSerializationSchema.class), producerConfig, partitioner);
    -			this.snapshottingFinished = snapshottingFinished;
    -		}
    +		testHarness.processElement(new StreamRecord<>("msg"));
     
    -		// constructor variant for test irrelated to snapshotting
    -		@SuppressWarnings("unchecked")
    -		public DummyFlinkKafkaProducer(Properties producerConfig, KafkaPartitioner partitioner) {
    -			super("dummy-topic", (KeyedSerializationSchema< T >) mock(KeyedSerializationSchema.class), producerConfig, partitioner);
    -			this.snapshottingFinished = new AtomicBoolean(true);
    -		}
    +		// make sure that all callbacks have not been completed
    +		verify(mockProducer, times(1)).send(any(ProducerRecord.class), any(Callback.class));
     
    -		@Override
    -		protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties props) {
    -			this.prod = new MockProducer();
    -			return this.prod;
    -		}
    +		// should return even if there are pending records
    +		testHarness.snapshot(123L, 123L);
     
    -		@Override
    -		public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
    -			// call the actual snapshot state
    -			super.snapshotState(ctx);
    -			// notify test that snapshotting has been done
    -			snapshottingFinished.set(true);
    -		}
    +		testHarness.close();
    +	}
     
    -		@Override
    -		protected void flush() {
    -			this.prod.flush();
    -		}
    +	// ------------------------------------------------------------------------
     
    -		public MockProducer getProducerInstance() {
    -			return this.prod;
    -		}
    -	}
    +	private static class DummyFlinkKafkaProducer<T> extends FlinkKafkaProducerBase<T> {
    +		private static final long serialVersionUID = 1L;
    +		
    +		private final static String DUMMY_TOPIC = "dummy-topic";
     
    -	private static class MockProducer<K, V> extends KafkaProducer<K, V> {
    -		List<Callback> pendingCallbacks = new ArrayList<>();
    +		private final KafkaProducer mockProducer;
    --- End diff --
    
    Not serializable --> `transient`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3278: [FLINK-5701] [kafka] FlinkKafkaProducer should che...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3278#discussion_r100030073
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java ---
    @@ -293,6 +293,61 @@ public void run() {
     		testHarness.close();
     	}
     
    +	/**
    +	 * This test is meant to assure that testAtLeastOnceProducer is valid by testing that if flushing is disabled,
    +	 * the snapshot method does indeed finishes without waiting for pending records;
    +	 * we set a timeout because the test will not finish if the logic is broken
    +	 */
    +	@Test(timeout=5000)
    +	public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws Throwable {
    +		final OneShotLatch inputLatch = new OneShotLatch();
    +
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, inputLatch, 100, new AtomicBoolean(false));
    +		producer.setFlushOnCheckpoint(false);
    +
    +		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +
    +		testHarness.open();
    +
    +		List<Callback> pending = producer.getProducerInstance().getPending();
    +
    +		for (int i = 0; i < 100; i++) {
    +			testHarness.processElement(new StreamRecord<>("msg-" + i));
    +		}
    +
    +		inputLatch.await();
    --- End diff --
    
    There is no concurrency in this test. Thus no need to synchronize on anything. The moment you reach this point, `inputLatch` will be triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3278: [FLINK-5701] [kafka] FlinkKafkaProducer should check asyn...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3278
  
    @tillrohrmann thanks for your second-pass review, the tips you mentioned were helpful. I've incoporated all of your comments except:
    
    > Maybe you could override the DummyFlinkKafkaProducer#flush method to insert some latches to see when you enter and when the method is done. Then you could wait on the first latch and check with the latter whether the method has completed.
    
    I think this is over-complicating things. I don't think it makes sense to add ways to explicitly wait for the `flush` method to complete - it's called in the `snapshotState` method, so isn't it identical to waiting for the snapshot thread to complete?
    
    Instead, I override the `snapshotState` method and added a flag inside the to make sure that when the base `snapshotState` implementation returns, it returned after it finished calling `flush`. Whether or not `flush` blocks correctly is out-of-scope of these tests, because `flush` is actually an abstract method for Kafka version-specific concrete subclasses to implement.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3278: [FLINK-5701] [kafka] FlinkKafkaProducer should che...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3278#discussion_r100035474
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java ---
    @@ -293,6 +293,61 @@ public void run() {
     		testHarness.close();
     	}
     
    +	/**
    +	 * This test is meant to assure that testAtLeastOnceProducer is valid by testing that if flushing is disabled,
    +	 * the snapshot method does indeed finishes without waiting for pending records;
    +	 * we set a timeout because the test will not finish if the logic is broken
    +	 */
    +	@Test(timeout=5000)
    +	public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws Throwable {
    +		final OneShotLatch inputLatch = new OneShotLatch();
    +
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, inputLatch, 100, new AtomicBoolean(false));
    +		producer.setFlushOnCheckpoint(false);
    +
    +		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +
    +		testHarness.open();
    +
    +		List<Callback> pending = producer.getProducerInstance().getPending();
    +
    +		for (int i = 0; i < 100; i++) {
    +			testHarness.processElement(new StreamRecord<>("msg-" + i));
    +		}
    --- End diff --
    
    1 is enough, will change this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3278: [FLINK-5701] [kafka] FlinkKafkaProducer should che...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3278#discussion_r100029980
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java ---
    @@ -293,6 +293,61 @@ public void run() {
     		testHarness.close();
     	}
     
    +	/**
    +	 * This test is meant to assure that testAtLeastOnceProducer is valid by testing that if flushing is disabled,
    +	 * the snapshot method does indeed finishes without waiting for pending records;
    +	 * we set a timeout because the test will not finish if the logic is broken
    +	 */
    +	@Test(timeout=5000)
    +	public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws Throwable {
    +		final OneShotLatch inputLatch = new OneShotLatch();
    +
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, inputLatch, 100, new AtomicBoolean(false));
    +		producer.setFlushOnCheckpoint(false);
    +
    +		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +
    +		testHarness.open();
    +
    +		List<Callback> pending = producer.getProducerInstance().getPending();
    +
    +		for (int i = 0; i < 100; i++) {
    +			testHarness.processElement(new StreamRecord<>("msg-" + i));
    +		}
    --- End diff --
    
    Why processing `100` elements? Wouldn't `1` be enough?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3278: [FLINK-5701] [kafka] FlinkKafkaProducer should che...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3278#discussion_r100695754
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java ---
    @@ -88,195 +87,296 @@ public void testKeyValueDeserializersSetIfMissing() throws Exception {
     	@Test
     	public void testPartitionerOpenedWithDeterminatePartitionList() throws Exception {
     		KafkaPartitioner mockPartitioner = mock(KafkaPartitioner.class);
    +
     		RuntimeContext mockRuntimeContext = mock(RuntimeContext.class);
     		when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
     		when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
    +		
    +		// out-of-order list of 4 partitions
    +		List<PartitionInfo> mockPartitionsList = new ArrayList<>(4);
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 3, null, null, null));
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 1, null, null, null));
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 0, null, null, null));
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 2, null, null, null));
    +		
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		when(mockProducer.partitionsFor(anyString())).thenReturn(mockPartitionsList);
    +		when(mockProducer.metrics()).thenReturn(null);
     
     		DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(
    -			FakeStandardProducerConfig.get(), mockPartitioner);
    +			FakeStandardProducerConfig.get(), mockPartitioner, mockProducer);
     		producer.setRuntimeContext(mockRuntimeContext);
     
     		producer.open(new Configuration());
     
    -		// the internal mock KafkaProducer will return an out-of-order list of 4 partitions,
    -		// which should be sorted before provided to the custom partitioner's open() method
    +		// the out-of-order partitions list should be sorted before provided to the custom partitioner's open() method
     		int[] correctPartitionList = {0, 1, 2, 3};
     		verify(mockPartitioner).open(0, 1, correctPartitionList);
     	}
     
     	/**
    -	 * Test ensuring that the producer is not dropping buffered records.;
    -	 * we set a timeout because the test will not finish if the logic is broken
    +	 * Test ensuring that if an invoke call happens right after an async exception is caught, it should be rethrown
     	 */
    -	@Test(timeout=5000)
    -	public void testAtLeastOnceProducer() throws Throwable {
    -		runAtLeastOnceTest(true);
    +	@Test
    +	public void testAsyncErrorRethrownOnInvoke() throws Throwable {
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, mockProducer);
    +
    +		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +		// let the message request return an async exception
    +		producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception"));
    +
    +		try {
    +			testHarness.processElement(new StreamRecord<>("msg-2"));
    +		} catch (Exception e) {
    +			// the next invoke should rethrow the async exception
    +			Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception"));
    +			return;
    +		}
    +
    +		Assert.fail();
     	}
     
     	/**
    -	 * Ensures that the at least once producing test fails if the flushing is disabled
    +	 * Test ensuring that if a snapshot call happens right after an async exception is caught, it should be rethrown
     	 */
    -	@Test(expected = AssertionError.class, timeout=5000)
    -	public void testAtLeastOnceProducerFailsIfFlushingDisabled() throws Throwable {
    -		runAtLeastOnceTest(false);
    -	}
    -
    -	private void runAtLeastOnceTest(boolean flushOnCheckpoint) throws Throwable {
    -		final AtomicBoolean snapshottingFinished = new AtomicBoolean(false);
    +	@Test
    +	public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
     		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    -			FakeStandardProducerConfig.get(), null, snapshottingFinished);
    -		producer.setFlushOnCheckpoint(flushOnCheckpoint);
    +			FakeStandardProducerConfig.get(), null, mockProducer);
     
     		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    -				new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
     
     		testHarness.open();
     
    -		for (int i = 0; i < 100; i++) {
    -			testHarness.processElement(new StreamRecord<>("msg-" + i));
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +		// let the message request return an async exception
    +		producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception"));
    +
    +		try {
    +			testHarness.snapshot(123L, 123L);
    +		} catch (Exception e) {
    +			// the next invoke should rethrow the async exception
    +			Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception"));
    +			return;
     		}
     
    -		// start a thread confirming all pending records
    -		final Tuple1<Throwable> runnableError = new Tuple1<>(null);
    -		final Thread threadA = Thread.currentThread();
    +		Assert.fail();
    +	}
     
    -		Runnable confirmer = new Runnable() {
    +	/**
    +	 * Test ensuring that if an async exception is caught for one of the flushed requests on checkpoint,
    +	 * it should be rethrown; we set a timeout because the test will not finish if the logic is broken.
    +	 *
    +	 * Note that this test does not test the snapshot method is blocked correctly when there are pending recorrds.
    +	 * The test for that is covered in testAtLeastOnceProducer.
    +	 */
    +	@Test(timeout=5000)
    +	public void testAsyncErrorRethrownOnCheckpointAfterFlush() throws Throwable {
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, mockProducer);
    +		producer.setFlushOnCheckpoint(true);
    +
    +		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +		testHarness.processElement(new StreamRecord<>("msg-2"));
    +		testHarness.processElement(new StreamRecord<>("msg-3"));
    +
    +		verify(mockProducer, times(3)).send(any(ProducerRecord.class), any(Callback.class));
    +
    +		// only let the first callback succeed for now
    +		producer.getPendingCallbacks().get(0).onCompletion(null, null);
    +
    +		final Tuple1<Throwable> asyncError = new Tuple1<>(null);
    +		Thread snapshotThread = new Thread(new Runnable() {
     			@Override
     			public void run() {
     				try {
    -					MockProducer mp = producer.getProducerInstance();
    -					List<Callback> pending = mp.getPending();
    -
    -					// we need to find out if the snapshot() method blocks forever
    -					// this is not possible. If snapshot() is running, it will
    -					// start removing elements from the pending list.
    -					synchronized (threadA) {
    -						threadA.wait(500L);
    -					}
    -					// we now check that no records have been confirmed yet
    -					Assert.assertEquals(100, pending.size());
    -					Assert.assertFalse("Snapshot method returned before all records were confirmed",
    -						snapshottingFinished.get());
    -
    -					// now confirm all checkpoints
    -					for (Callback c: pending) {
    -						c.onCompletion(null, null);
    -					}
    -					pending.clear();
    -				} catch(Throwable t) {
    -					runnableError.f0 = t;
    +					// this should block at first, since there are still two pending records that needs to be flushed
    +					testHarness.snapshot(123L, 123L);
    +				} catch (Exception e) {
    +					asyncError.f0 = e;
     				}
     			}
    -		};
    -		Thread threadB = new Thread(confirmer);
    -		threadB.start();
    +		});
    +		snapshotThread.start();
     
    -		// this should block:
    -		testHarness.snapshot(0, 0);
    +		// let the 2nd message fail with an async exception
    +		producer.getPendingCallbacks().get(1).onCompletion(null, new Exception("artificial async failure for 2nd message"));
    +		producer.getPendingCallbacks().get(2).onCompletion(null, null);
     
    -		synchronized (threadA) {
    -			threadA.notifyAll(); // just in case, to let the test fail faster
    -		}
    -		Assert.assertEquals(0, producer.getProducerInstance().getPending().size());
    -		Deadline deadline = FiniteDuration.apply(5, "s").fromNow();
    -		while (deadline.hasTimeLeft() && threadB.isAlive()) {
    -			threadB.join(500);
    +		snapshotThread.join();
    +
    +		// the snapshot should have failed with the async exception
    +		Assert.assertTrue(asyncError.f0 != null && asyncError.f0.getCause().getMessage().contains("artificial async failure for 2nd message"));
    +	}
    +
    +	/**
    +	 * Test ensuring that the producer is not dropping buffered records;
    +	 * we set a timeout because the test will not finish if the logic is broken
    +	 */
    +	@Test(timeout=10000)
    +	public void testAtLeastOnceProducer() throws Throwable {
    +
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, mockProducer);
    +		producer.setFlushOnCheckpoint(true);
    +
    +		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +		testHarness.processElement(new StreamRecord<>("msg-2"));
    +		testHarness.processElement(new StreamRecord<>("msg-3"));
    +
    +		verify(mockProducer, times(3)).send(any(ProducerRecord.class), any(Callback.class));
    +		Assert.assertEquals(3, producer.getPendingSize());
    +
    +		// start a thread to perform checkpointing
    +		final Tuple1<Throwable> runnableError = new Tuple1<>(null);
    +		final OneShotLatch snapshotReturnedLatch = new OneShotLatch();
    +
    +		Thread snapshotThread = new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				try {
    +					// this should block until all records are flushed
    +					testHarness.snapshot(123L, 123L);
    +				} catch (Throwable e) {
    +					runnableError.f0 = e;
    +				} finally {
    +					snapshotReturnedLatch.trigger();
    +				}
    +			}
    +		});
    +		snapshotThread.start();
    +
    +		// being extra safe that the snapshot is correctly blocked
    +		try {
    +			snapshotReturnedLatch.await(3, TimeUnit.SECONDS);
    +		} catch (TimeoutException expected) {
    +			//
     		}
    -		Assert.assertFalse("Thread A is expected to be finished at this point. If not, the test is prone to fail", threadB.isAlive());
    +		Assert.assertTrue("Snapshot returned before all records were flushed", !snapshotReturnedLatch.isTriggered());
    +
    +		producer.getPendingCallbacks().get(0).onCompletion(null, null);
    +		Assert.assertTrue("Snapshot returned before all records were flushed", !snapshotReturnedLatch.isTriggered());
    +		Assert.assertEquals(2, producer.getPendingSize());
    +
    +		producer.getPendingCallbacks().get(1).onCompletion(null, null);
    +		Assert.assertTrue("Snapshot returned before all records were flushed", !snapshotReturnedLatch.isTriggered());
    +		Assert.assertEquals(1, producer.getPendingSize());
    +
    +		producer.getPendingCallbacks().get(2).onCompletion(null, null);
    +		Assert.assertEquals(0, producer.getPendingSize());
    +
    +		snapshotReturnedLatch.await();
    +		snapshotThread.join();
    +
     		if (runnableError.f0 != null) {
     			throw runnableError.f0;
     		}
     
     		testHarness.close();
     	}
     
    +	/**
    +	 * This test is meant to assure that testAtLeastOnceProducer is valid by testing that if flushing is disabled,
    +	 * the snapshot method does indeed finishes without waiting for pending records;
    +	 * we set a timeout because the test will not finish if the logic is broken
    +	 */
    +	@Test(timeout=5000)
    +	public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws Throwable {
     
    -	// ------------------------------------------------------------------------
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, mockProducer);
    +		producer.setFlushOnCheckpoint(false);
     
    -	private static class DummyFlinkKafkaProducer<T> extends FlinkKafkaProducerBase<T> {
    -		private static final long serialVersionUID = 1L;
    +		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
     
    -		private transient MockProducer prod;
    -		private AtomicBoolean snapshottingFinished;
    +		testHarness.open();
     
    -		@SuppressWarnings("unchecked")
    -		public DummyFlinkKafkaProducer(Properties producerConfig, KafkaPartitioner partitioner, AtomicBoolean snapshottingFinished) {
    -			super("dummy-topic", (KeyedSerializationSchema< T >) mock(KeyedSerializationSchema.class), producerConfig, partitioner);
    -			this.snapshottingFinished = snapshottingFinished;
    -		}
    +		testHarness.processElement(new StreamRecord<>("msg"));
     
    -		// constructor variant for test irrelated to snapshotting
    -		@SuppressWarnings("unchecked")
    -		public DummyFlinkKafkaProducer(Properties producerConfig, KafkaPartitioner partitioner) {
    -			super("dummy-topic", (KeyedSerializationSchema< T >) mock(KeyedSerializationSchema.class), producerConfig, partitioner);
    -			this.snapshottingFinished = new AtomicBoolean(true);
    -		}
    +		// make sure that all callbacks have not been completed
    +		verify(mockProducer, times(1)).send(any(ProducerRecord.class), any(Callback.class));
     
    -		@Override
    -		protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties props) {
    -			this.prod = new MockProducer();
    -			return this.prod;
    -		}
    +		// should return even if there are pending records
    +		testHarness.snapshot(123L, 123L);
     
    -		@Override
    -		public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
    -			// call the actual snapshot state
    -			super.snapshotState(ctx);
    -			// notify test that snapshotting has been done
    -			snapshottingFinished.set(true);
    -		}
    +		testHarness.close();
    +	}
     
    -		@Override
    -		protected void flush() {
    -			this.prod.flush();
    -		}
    +	// ------------------------------------------------------------------------
     
    -		public MockProducer getProducerInstance() {
    -			return this.prod;
    -		}
    -	}
    +	private static class DummyFlinkKafkaProducer<T> extends FlinkKafkaProducerBase<T> {
    +		private static final long serialVersionUID = 1L;
    +		
    +		private final static String DUMMY_TOPIC = "dummy-topic";
     
    -	private static class MockProducer<K, V> extends KafkaProducer<K, V> {
    -		List<Callback> pendingCallbacks = new ArrayList<>();
    +		private final KafkaProducer mockProducer;
    +		private final List<Callback> pendingCallbacks = new ArrayList<>();
    --- End diff --
    
    `Callback` is not serializable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3278: [FLINK-5701] [kafka] FlinkKafkaProducer should che...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3278#discussion_r100695842
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java ---
    @@ -88,195 +87,296 @@ public void testKeyValueDeserializersSetIfMissing() throws Exception {
     	@Test
     	public void testPartitionerOpenedWithDeterminatePartitionList() throws Exception {
     		KafkaPartitioner mockPartitioner = mock(KafkaPartitioner.class);
    +
     		RuntimeContext mockRuntimeContext = mock(RuntimeContext.class);
     		when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
     		when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
    +		
    +		// out-of-order list of 4 partitions
    +		List<PartitionInfo> mockPartitionsList = new ArrayList<>(4);
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 3, null, null, null));
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 1, null, null, null));
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 0, null, null, null));
    +		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 2, null, null, null));
    +		
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		when(mockProducer.partitionsFor(anyString())).thenReturn(mockPartitionsList);
    +		when(mockProducer.metrics()).thenReturn(null);
     
     		DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(
    -			FakeStandardProducerConfig.get(), mockPartitioner);
    +			FakeStandardProducerConfig.get(), mockPartitioner, mockProducer);
     		producer.setRuntimeContext(mockRuntimeContext);
     
     		producer.open(new Configuration());
     
    -		// the internal mock KafkaProducer will return an out-of-order list of 4 partitions,
    -		// which should be sorted before provided to the custom partitioner's open() method
    +		// the out-of-order partitions list should be sorted before provided to the custom partitioner's open() method
     		int[] correctPartitionList = {0, 1, 2, 3};
     		verify(mockPartitioner).open(0, 1, correctPartitionList);
     	}
     
     	/**
    -	 * Test ensuring that the producer is not dropping buffered records.;
    -	 * we set a timeout because the test will not finish if the logic is broken
    +	 * Test ensuring that if an invoke call happens right after an async exception is caught, it should be rethrown
     	 */
    -	@Test(timeout=5000)
    -	public void testAtLeastOnceProducer() throws Throwable {
    -		runAtLeastOnceTest(true);
    +	@Test
    +	public void testAsyncErrorRethrownOnInvoke() throws Throwable {
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, mockProducer);
    +
    +		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +		// let the message request return an async exception
    +		producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception"));
    +
    +		try {
    +			testHarness.processElement(new StreamRecord<>("msg-2"));
    +		} catch (Exception e) {
    +			// the next invoke should rethrow the async exception
    +			Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception"));
    +			return;
    +		}
    +
    +		Assert.fail();
     	}
     
     	/**
    -	 * Ensures that the at least once producing test fails if the flushing is disabled
    +	 * Test ensuring that if a snapshot call happens right after an async exception is caught, it should be rethrown
     	 */
    -	@Test(expected = AssertionError.class, timeout=5000)
    -	public void testAtLeastOnceProducerFailsIfFlushingDisabled() throws Throwable {
    -		runAtLeastOnceTest(false);
    -	}
    -
    -	private void runAtLeastOnceTest(boolean flushOnCheckpoint) throws Throwable {
    -		final AtomicBoolean snapshottingFinished = new AtomicBoolean(false);
    +	@Test
    +	public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
     		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    -			FakeStandardProducerConfig.get(), null, snapshottingFinished);
    -		producer.setFlushOnCheckpoint(flushOnCheckpoint);
    +			FakeStandardProducerConfig.get(), null, mockProducer);
     
     		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    -				new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
     
     		testHarness.open();
     
    -		for (int i = 0; i < 100; i++) {
    -			testHarness.processElement(new StreamRecord<>("msg-" + i));
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +		// let the message request return an async exception
    +		producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception"));
    +
    +		try {
    +			testHarness.snapshot(123L, 123L);
    +		} catch (Exception e) {
    +			// the next invoke should rethrow the async exception
    +			Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception"));
    +			return;
     		}
     
    -		// start a thread confirming all pending records
    -		final Tuple1<Throwable> runnableError = new Tuple1<>(null);
    -		final Thread threadA = Thread.currentThread();
    +		Assert.fail();
    +	}
     
    -		Runnable confirmer = new Runnable() {
    +	/**
    +	 * Test ensuring that if an async exception is caught for one of the flushed requests on checkpoint,
    +	 * it should be rethrown; we set a timeout because the test will not finish if the logic is broken.
    +	 *
    +	 * Note that this test does not test the snapshot method is blocked correctly when there are pending recorrds.
    +	 * The test for that is covered in testAtLeastOnceProducer.
    +	 */
    +	@Test(timeout=5000)
    +	public void testAsyncErrorRethrownOnCheckpointAfterFlush() throws Throwable {
    +		KafkaProducer mockProducer = mock(KafkaProducer.class);
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, mockProducer);
    +		producer.setFlushOnCheckpoint(true);
    +
    +		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +		testHarness.processElement(new StreamRecord<>("msg-2"));
    +		testHarness.processElement(new StreamRecord<>("msg-3"));
    +
    +		verify(mockProducer, times(3)).send(any(ProducerRecord.class), any(Callback.class));
    +
    +		// only let the first callback succeed for now
    +		producer.getPendingCallbacks().get(0).onCompletion(null, null);
    +
    +		final Tuple1<Throwable> asyncError = new Tuple1<>(null);
    +		Thread snapshotThread = new Thread(new Runnable() {
    --- End diff --
    
    Here you can use a `CheckedThread` instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3278: [FLINK-5701] [kafka] FlinkKafkaProducer should che...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3278#discussion_r100036011
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java ---
    @@ -293,6 +293,61 @@ public void run() {
     		testHarness.close();
     	}
     
    +	/**
    +	 * This test is meant to assure that testAtLeastOnceProducer is valid by testing that if flushing is disabled,
    +	 * the snapshot method does indeed finishes without waiting for pending records;
    +	 * we set a timeout because the test will not finish if the logic is broken
    +	 */
    +	@Test(timeout=5000)
    +	public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws Throwable {
    +		final OneShotLatch inputLatch = new OneShotLatch();
    +
    +		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
    +			FakeStandardProducerConfig.get(), null, inputLatch, 100, new AtomicBoolean(false));
    +		producer.setFlushOnCheckpoint(false);
    +
    +		final OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
    +
    +		testHarness.open();
    +
    +		List<Callback> pending = producer.getProducerInstance().getPending();
    +
    +		for (int i = 0; i < 100; i++) {
    +			testHarness.processElement(new StreamRecord<>("msg-" + i));
    +		}
    +
    +		inputLatch.await();
    +
    +		// make sure that all callbacks have not been completed
    +		Assert.assertEquals(100, pending.size());
    +
    +		// use a separate thread to continuously monitor whether snapshotting has returned
    +		final Tuple1<Throwable> runnableError = new Tuple1<>(null);
    +		Thread snapshotThread = new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				try {
    +					testHarness.snapshot(123L, 123L);
    +				} catch (Exception e) {
    +					runnableError.f0 = e;
    +				}
    +			}
    +		});
    --- End diff --
    
    You're right, not really sure what I was thinking at the time :/ Will change this!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---