You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StephanEwen <gi...@git.apache.org> on 2016/09/29 16:11:37 UTC

[GitHub] flink pull request #2574: [FLINK-4702] [kafka connector] Commit offsets to K...

GitHub user StephanEwen opened a pull request:

    https://github.com/apache/flink/pull/2574

    [FLINK-4702] [kafka connector] Commit offsets to Kafka asynchronously and don't block on polls

    This fix is quite critical!
    
    While the KafkaConsumer is polling for new data (with a timeout), it holds the consumer lock. If no data comes in Kafka, the lock is not released before the poll timeout is over.
    
    During that time, no offset commit can make progress, because it needs the consumer lock. The `notifyCheckpointComplete()` method of the Kafka Consumer hence blocks until the poll timeout is over and the lock is released. For low-throughput Kafka Topics, this can cause wildly long checkpoint delays.
    
    This changes `notifyCheckpointComplete()` to only "schedule" offsets to be committed, while the main fetcher thread actually kick off the asynchronous offset commits. That way, there is no interference between the `notifyCheckpointComplete()` method (which is executed under checkpoint lock) and the consumer lock.
    
    In fact, the only KafkaConsumer method accessed concurrently to the main fetcher thread is `wakeup()` which is actually thread-safe (where the rest of the KafkaConsumer is not). The consumer lock was hence completely removed.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StephanEwen/incubator-flink kafka_09_fix

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2574.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2574
    
----
commit 0846fd907db7d52d7e5fb7d704c5e1c13462e331
Author: Stephan Ewen <se...@apache.org>
Date:   2016-09-29T16:09:51Z

    [FLINK-4702] [kafka connector] Commit offsets to Kafka asynchronously and don't block on polls
    
    Letting the Kafka commit block on polls means that 'notifyCheckpointComplete()' may take
    very long. This is mostly relevant for low-throughput Kafka topics.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2574: [FLINK-4702] [kafka connector] Commit offsets to Kafka as...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2574
  
    @tzulitai @rmetzger We need to make sure that the Kafka 0.10 code picks up this change and the test case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2574: [FLINK-4702] [kafka connector] Commit offsets to K...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2574#discussion_r81303182
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java ---
    @@ -283,10 +296,16 @@ public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets)
     			}
     		}
     
    -		if (this.consumer != null) {
    -			synchronized (consumerLock) {
    -				this.consumer.commitSync(offsetsToCommit);
    -			}
    +		if (commitInProgress) {
    +			LOG.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " +
    +					"Some checkpoints may be subsumed before committed. " +
    +					"This does not compromise Flink's checkpoint integrity.");
    +		}
    --- End diff --
    
    I would not want to put it into the main loop, because then the warning would come repeatedly, every time the poll happens.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2574: [FLINK-4702] [kafka connector] Commit offsets to K...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2574#discussion_r81277067
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java ---
    @@ -0,0 +1,300 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.core.testutils.MultiShotLatch;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    +import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
    +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
    +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
    +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
    +import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
    +
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.clients.consumer.OffsetCommitCallback;
    +import org.apache.kafka.common.TopicPartition;
    +
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +
    +import org.mockito.Mockito;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +import org.powermock.core.classloader.annotations.PrepareForTest;
    +import org.powermock.modules.junit4.PowerMockRunner;
    +
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Properties;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +
    +import static org.mockito.Mockito.any;
    +import static org.mockito.Mockito.anyLong;
    +import static org.powermock.api.mockito.PowerMockito.doAnswer;
    +import static org.powermock.api.mockito.PowerMockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +import static org.powermock.api.mockito.PowerMockito.whenNew;
    +
    +/**
    + * Unit tests for the {@link Kafka09Fetcher}.
    + */
    +@RunWith(PowerMockRunner.class)
    +@PrepareForTest(Kafka09Fetcher.class)
    +public class Kafka09FetcherTest {
    +
    +	@Test
    +	public void testCommitDoesNotBlock() throws Exception {
    +
    +		// test data
    +		final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
    +		final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
    +		testCommitData.put(testPartition, 11L);
    +
    +		// to synchronize when the consumer is in its blocking method
    +		final OneShotLatch sync = new OneShotLatch();
    +
    +		// ----- the mock consumer with blocking poll calls ----
    +		final MultiShotLatch blockerLatch = new MultiShotLatch();
    +		
    +		KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
    +		when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
    +			
    +			@Override
    +			public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
    +				sync.trigger();
    +				blockerLatch.await();
    +				return ConsumerRecords.empty();
    +			}
    +		});
    +
    +		doAnswer(new Answer<Void>() {
    +			@Override
    +			public Void answer(InvocationOnMock invocation) {
    +				blockerLatch.trigger();
    +				return null;
    +			}
    +		}).when(mockConsumer).wakeup();
    +
    +		// make sure the fetcher creates the mock consumer
    +		whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
    +
    +		// ----- create the test fetcher -----
    +
    +		@SuppressWarnings("unchecked")
    +		SourceContext<String> sourceContext = mock(SourceContext.class);
    +		List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
    +		KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
    +		StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
    +		
    +		final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
    +				sourceContext, topics, null, null, context, schema, new Properties(), 0L, false);
    +
    +		// ----- run the fetcher -----
    +
    +		final AtomicReference<Throwable> error = new AtomicReference<>();
    +		final Thread fetcherRunner = new Thread("fetcher runner") {
    +
    +			@Override
    +			public void run() {
    +				try {
    +					fetcher.runFetchLoop();
    +				} catch (Throwable t) {
    +					error.set(t);
    +				}
    +			}
    +		};
    +		fetcherRunner.start();
    +
    +		// wait until the fetcher has reached the method of interest
    +		sync.await();
    +
    +		// ----- trigger the offset commit -----
    +		
    +		final AtomicReference<Throwable> commitError = new AtomicReference<>();
    +		final Thread committer = new Thread("committer runner") {
    +			@Override
    +			public void run() {
    +				try {
    +					fetcher.commitSpecificOffsetsToKafka(testCommitData);
    +				} catch (Throwable t) {
    +					commitError.set(t);
    +				}
    +			}
    +		};
    +		committer.start();
    +
    +		// ----- ensure that the committer finishes in time  -----
    +		committer.join(30000);
    +		assertFalse("The committer did not finish in time", committer.isAlive());
    +
    +		// ----- test done, wait till the fetcher is done for a clean shutdown -----
    +		fetcher.cancel();
    +		fetcherRunner.join();
    +
    +		// check that there were no errors in the fetcher
    +		final Throwable caughtError = error.get();
    +		if (caughtError != null) {
    +			throw new Exception("Exception in the fetcher", caughtError);
    +		}
    --- End diff --
    
    Might as well also check `commitError`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2574: [FLINK-4702] [kafka connector] Commit offsets to K...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen closed the pull request at:

    https://github.com/apache/flink/pull/2574


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2574: [FLINK-4702] [kafka connector] Commit offsets to Kafka as...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2574
  
    Merged in 92f4539afc714f7dbd293c3ad677b3b5807c6911
    
    Addresses @tzulitai comments. Fixed the warning log message by using atomic swap when setting the next offsets to be committed. If it swaps for a non-null value, that value is skipped and subsumed by the newer offsets.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2574: [FLINK-4702] [kafka connector] Commit offsets to K...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2574#discussion_r81303617
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java ---
    @@ -0,0 +1,300 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.core.testutils.MultiShotLatch;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    +import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
    +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
    +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
    +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
    +import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
    +
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.clients.consumer.OffsetCommitCallback;
    +import org.apache.kafka.common.TopicPartition;
    +
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +
    +import org.mockito.Mockito;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +import org.powermock.core.classloader.annotations.PrepareForTest;
    +import org.powermock.modules.junit4.PowerMockRunner;
    +
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Properties;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +
    +import static org.mockito.Mockito.any;
    +import static org.mockito.Mockito.anyLong;
    +import static org.powermock.api.mockito.PowerMockito.doAnswer;
    +import static org.powermock.api.mockito.PowerMockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +import static org.powermock.api.mockito.PowerMockito.whenNew;
    +
    +/**
    + * Unit tests for the {@link Kafka09Fetcher}.
    + */
    +@RunWith(PowerMockRunner.class)
    +@PrepareForTest(Kafka09Fetcher.class)
    +public class Kafka09FetcherTest {
    --- End diff --
    
    It is already integrated in the test, actually. Since in the mock consumer, the `poll()` call blocks forever and only wakes up on `wakeup()`, the test fails is `wakeup()` is not called.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2574: [FLINK-4702] [kafka connector] Commit offsets to K...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2574#discussion_r81275213
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java ---
    @@ -283,10 +296,16 @@ public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets)
     			}
     		}
     
    -		if (this.consumer != null) {
    -			synchronized (consumerLock) {
    -				this.consumer.commitSync(offsetsToCommit);
    -			}
    +		if (commitInProgress) {
    +			LOG.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " +
    +					"Some checkpoints may be subsumed before committed. " +
    +					"This does not compromise Flink's checkpoint integrity.");
    +		}
    --- End diff --
    
    Will it make sense to simply move this warning to the `if (toCommit != null && !commitInProgress)` block in the main thread? That's where `commitInProgress` will actually determine whether or not the current offsets to commit will be dropped. Also, the actual committing should happen right after anyways because of `consumer.wakeup()`, so I don't see the purpose of an eager warning here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2574: [FLINK-4702] [kafka connector] Commit offsets to K...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2574#discussion_r81303450
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java ---
    @@ -283,10 +296,16 @@ public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets)
     			}
     		}
     
    -		if (this.consumer != null) {
    -			synchronized (consumerLock) {
    -				this.consumer.commitSync(offsetsToCommit);
    -			}
    +		if (commitInProgress) {
    +			LOG.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " +
    +					"Some checkpoints may be subsumed before committed. " +
    +					"This does not compromise Flink's checkpoint integrity.");
    +		}
    --- End diff --
    
    Also, in both cases, you cannot know whether it is dropped - it may still be that only one commit was delayed and the "toCommit" data will actually be picked up, but with a delay.
    
    I have an idea though where we can check that properly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2574: [FLINK-4702] [kafka connector] Commit offsets to Kafka as...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2574
  
    Thanks for the review. I think some points you mentioned are already addressed, actually.
    
    Will add the following:
      - `commitSpecificOffsets()` can determine precisely when a commit request is subsumed by a new one
      - Extend test to catch errors from committer thread.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2574: [FLINK-4702] [kafka connector] Commit offsets to K...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2574#discussion_r81302867
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java ---
    @@ -0,0 +1,300 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.core.testutils.MultiShotLatch;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    +import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
    +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
    +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
    +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
    +import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
    +
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.clients.consumer.OffsetCommitCallback;
    +import org.apache.kafka.common.TopicPartition;
    +
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +
    +import org.mockito.Mockito;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +import org.powermock.core.classloader.annotations.PrepareForTest;
    +import org.powermock.modules.junit4.PowerMockRunner;
    +
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Properties;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +
    +import static org.mockito.Mockito.any;
    +import static org.mockito.Mockito.anyLong;
    +import static org.powermock.api.mockito.PowerMockito.doAnswer;
    +import static org.powermock.api.mockito.PowerMockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +import static org.powermock.api.mockito.PowerMockito.whenNew;
    +
    +/**
    + * Unit tests for the {@link Kafka09Fetcher}.
    + */
    +@RunWith(PowerMockRunner.class)
    +@PrepareForTest(Kafka09Fetcher.class)
    +public class Kafka09FetcherTest {
    +
    +	@Test
    +	public void testCommitDoesNotBlock() throws Exception {
    +
    +		// test data
    +		final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
    +		final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
    +		testCommitData.put(testPartition, 11L);
    +
    +		// to synchronize when the consumer is in its blocking method
    +		final OneShotLatch sync = new OneShotLatch();
    +
    +		// ----- the mock consumer with blocking poll calls ----
    +		final MultiShotLatch blockerLatch = new MultiShotLatch();
    +		
    +		KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
    +		when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
    +			
    +			@Override
    +			public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
    +				sync.trigger();
    +				blockerLatch.await();
    +				return ConsumerRecords.empty();
    +			}
    +		});
    +
    +		doAnswer(new Answer<Void>() {
    +			@Override
    +			public Void answer(InvocationOnMock invocation) {
    +				blockerLatch.trigger();
    +				return null;
    +			}
    +		}).when(mockConsumer).wakeup();
    +
    +		// make sure the fetcher creates the mock consumer
    +		whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
    +
    +		// ----- create the test fetcher -----
    +
    +		@SuppressWarnings("unchecked")
    +		SourceContext<String> sourceContext = mock(SourceContext.class);
    +		List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
    +		KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
    +		StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
    +		
    +		final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
    +				sourceContext, topics, null, null, context, schema, new Properties(), 0L, false);
    +
    +		// ----- run the fetcher -----
    +
    +		final AtomicReference<Throwable> error = new AtomicReference<>();
    +		final Thread fetcherRunner = new Thread("fetcher runner") {
    +
    +			@Override
    +			public void run() {
    +				try {
    +					fetcher.runFetchLoop();
    +				} catch (Throwable t) {
    +					error.set(t);
    +				}
    +			}
    +		};
    +		fetcherRunner.start();
    +
    +		// wait until the fetcher has reached the method of interest
    +		sync.await();
    +
    +		// ----- trigger the offset commit -----
    +		
    +		final AtomicReference<Throwable> commitError = new AtomicReference<>();
    +		final Thread committer = new Thread("committer runner") {
    +			@Override
    +			public void run() {
    +				try {
    +					fetcher.commitSpecificOffsetsToKafka(testCommitData);
    +				} catch (Throwable t) {
    +					commitError.set(t);
    +				}
    +			}
    +		};
    +		committer.start();
    +
    +		// ----- ensure that the committer finishes in time  -----
    +		committer.join(30000);
    +		assertFalse("The committer did not finish in time", committer.isAlive());
    +
    +		// ----- test done, wait till the fetcher is done for a clean shutdown -----
    +		fetcher.cancel();
    +		fetcherRunner.join();
    +
    +		// check that there were no errors in the fetcher
    +		final Throwable caughtError = error.get();
    +		if (caughtError != null) {
    +			throw new Exception("Exception in the fetcher", caughtError);
    +		}
    --- End diff --
    
    Yes, should and will do that!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2574: [FLINK-4702] [kafka connector] Commit offsets to K...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2574#discussion_r81303671
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java ---
    @@ -0,0 +1,300 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.core.testutils.MultiShotLatch;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    +import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
    +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
    +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
    +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
    +import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
    +
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.clients.consumer.OffsetCommitCallback;
    +import org.apache.kafka.common.TopicPartition;
    +
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +
    +import org.mockito.Mockito;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +import org.powermock.core.classloader.annotations.PrepareForTest;
    +import org.powermock.modules.junit4.PowerMockRunner;
    +
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Properties;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +
    +import static org.mockito.Mockito.any;
    +import static org.mockito.Mockito.anyLong;
    +import static org.powermock.api.mockito.PowerMockito.doAnswer;
    +import static org.powermock.api.mockito.PowerMockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +import static org.powermock.api.mockito.PowerMockito.whenNew;
    +
    +/**
    + * Unit tests for the {@link Kafka09Fetcher}.
    + */
    +@RunWith(PowerMockRunner.class)
    +@PrepareForTest(Kafka09Fetcher.class)
    +public class Kafka09FetcherTest {
    +
    +	@Test
    +	public void testCommitDoesNotBlock() throws Exception {
    +
    +		// test data
    +		final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
    +		final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
    +		testCommitData.put(testPartition, 11L);
    +
    +		// to synchronize when the consumer is in its blocking method
    +		final OneShotLatch sync = new OneShotLatch();
    +
    +		// ----- the mock consumer with blocking poll calls ----
    +		final MultiShotLatch blockerLatch = new MultiShotLatch();
    +		
    +		KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
    +		when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
    +			
    +			@Override
    +			public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
    +				sync.trigger();
    +				blockerLatch.await();
    --- End diff --
    
    See above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2574: [FLINK-4702] [kafka connector] Commit offsets to K...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2574#discussion_r81277361
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java ---
    @@ -0,0 +1,300 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.core.testutils.MultiShotLatch;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    +import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
    +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
    +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
    +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
    +import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
    +
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.clients.consumer.OffsetCommitCallback;
    +import org.apache.kafka.common.TopicPartition;
    +
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +
    +import org.mockito.Mockito;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +import org.powermock.core.classloader.annotations.PrepareForTest;
    +import org.powermock.modules.junit4.PowerMockRunner;
    +
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Properties;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +
    +import static org.mockito.Mockito.any;
    +import static org.mockito.Mockito.anyLong;
    +import static org.powermock.api.mockito.PowerMockito.doAnswer;
    +import static org.powermock.api.mockito.PowerMockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +import static org.powermock.api.mockito.PowerMockito.whenNew;
    +
    +/**
    + * Unit tests for the {@link Kafka09Fetcher}.
    + */
    +@RunWith(PowerMockRunner.class)
    +@PrepareForTest(Kafka09Fetcher.class)
    +public class Kafka09FetcherTest {
    --- End diff --
    
    Should we also add a test to make sure that `KafkaConsumer` is immediately called `wakeup()` in `commitSpecificOffsetsToKafka`? Otherwise we are not ensuring the behaviour of "committing offsets back to Kafka on checkpoints"
    
    Perhaps this can be integrated into `testCommitDoesNotBlock()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2574: [FLINK-4702] [kafka connector] Commit offsets to K...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2574#discussion_r81282102
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java ---
    @@ -0,0 +1,300 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.core.testutils.MultiShotLatch;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    +import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
    +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
    +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
    +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
    +import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
    +
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.clients.consumer.OffsetCommitCallback;
    +import org.apache.kafka.common.TopicPartition;
    +
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +
    +import org.mockito.Mockito;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +import org.powermock.core.classloader.annotations.PrepareForTest;
    +import org.powermock.modules.junit4.PowerMockRunner;
    +
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Properties;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +
    +import static org.mockito.Mockito.any;
    +import static org.mockito.Mockito.anyLong;
    +import static org.powermock.api.mockito.PowerMockito.doAnswer;
    +import static org.powermock.api.mockito.PowerMockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +import static org.powermock.api.mockito.PowerMockito.whenNew;
    +
    +/**
    + * Unit tests for the {@link Kafka09Fetcher}.
    + */
    +@RunWith(PowerMockRunner.class)
    +@PrepareForTest(Kafka09Fetcher.class)
    +public class Kafka09FetcherTest {
    +
    +	@Test
    +	public void testCommitDoesNotBlock() throws Exception {
    +
    +		// test data
    +		final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
    +		final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
    +		testCommitData.put(testPartition, 11L);
    +
    +		// to synchronize when the consumer is in its blocking method
    +		final OneShotLatch sync = new OneShotLatch();
    +
    +		// ----- the mock consumer with blocking poll calls ----
    +		final MultiShotLatch blockerLatch = new MultiShotLatch();
    +		
    +		KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
    +		when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
    +			
    +			@Override
    +			public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
    +				sync.trigger();
    +				blockerLatch.await();
    --- End diff --
    
    We are not ensuring that `blockLatch` is returned here, correct? Like my comment above, perhaps we should check that to to ensure that `wakeup` is called in `commitSpecificOffsetsToKafka`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---