You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by snuyanzin <gi...@git.apache.org> on 2018/05/17 16:47:34 UTC

[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...

GitHub user snuyanzin opened a pull request:

    https://github.com/apache/flink/pull/6040

    [FLINK-9349][Kafka Connector] KafkaConnector Exception while fetching from multiple kafka topics

    *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
    
    *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
    
    ## Contribution Checklist
    
      - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
      
      - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
      Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
    
      - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
      
      - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices).
    
      - Each pull request should address only one issue, not mix up code from multiple issues.
      
      - Each commit in the pull request has a meaningful commit message (including the JIRA id)
    
      - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
    
    
    **(The sections below can be removed for hotfixes of typos)**
    
    ## What is the purpose of the change
    
    *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)*
    
    
    ## Brief change log
    
    fix synchronization issue
    + test to verify it
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
    via org.apache.flink.streaming.connectors.kafka.internal.Flink9349Test#testConcurrentPartitionsDiscoveryAndLoopFetching
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: ( no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no )
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: ( no )
      - The S3 file system connector: ( no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? ( no)
      - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/snuyanzin/flink FLINK-9349_KafkaConnector_Exception_while_fetching_from_multiple_kafka_topics

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6040.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6040
    
----
commit 8de5f37549607460659e171f9c3b48d0090383c0
Author: snuyanzin <sn...@...>
Date:   2018-05-17T16:12:04Z

    added test and fix for FLINK-9349 by usage of CopyOnWriteArrayList

commit eee524e2d2a86af5252ed939000c12a2604917e9
Author: snuyanzin <sn...@...>
Date:   2018-05-17T16:35:10Z

    fix checkstyle

----


---

[GitHub] flink issue #6040: [FLINK-9349][Kafka Connector] KafkaConnector Exception wh...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/6040
  
    @snuyanzin the failing `YARNSessionCapacitySchedulerITCase` is known to be bit flaky, so you can safely ignore that for now. I'll take another look at your changes soon. Thanks!


---

[GitHub] flink issue #6040: [FLINK-9349][Kafka Connector] KafkaConnector Exception wh...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/6040
  
    Using `CheckedThread` is more preferable, as it simplifies some of the test code.
    But yes, the utility was introduced at a later point in time in Flink, so some parts of the test code might still be using `Thread`s and `AtomicReference`s.


---

[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6040#discussion_r190110928
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java ---
    @@ -390,6 +398,102 @@ public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma
     		assertEquals(100, sourceContext.getLatestWatermark().getTimestamp());
     	}
     
    +	@Test
    +	public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception {
    +		// test data
    +		final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
    +
    +		final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
    +		testCommitData.put(testPartition, 11L);
    +
    +		// ----- create the test fetcher -----
    +
    +		@SuppressWarnings("unchecked")
    +		SourceContext<String> sourceContext = PowerMockito.mock(SourceContext.class);
    --- End diff --
    
    It is unnecessary to use a power mock here. A dummy implementation of a `SourceContext` will be better.


---

[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6040#discussion_r190114844
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java ---
    @@ -390,6 +398,102 @@ public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma
     		assertEquals(100, sourceContext.getLatestWatermark().getTimestamp());
     	}
     
    +	@Test
    +	public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception {
    +		// test data
    +		final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
    +
    +		final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
    +		testCommitData.put(testPartition, 11L);
    +
    +		// ----- create the test fetcher -----
    +
    +		@SuppressWarnings("unchecked")
    +		SourceContext<String> sourceContext = PowerMockito.mock(SourceContext.class);
    +		Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
    +			Collections.singletonMap(testPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
    +
    +		final TestFetcher<String> fetcher = new TestFetcher<>(
    +			sourceContext,
    +			partitionsWithInitialOffsets,
    +			null, /* periodic assigner */
    +			null, /* punctuated assigner */
    +			new TestProcessingTimeService(),
    +			10);
    +
    +		// ----- run the fetcher -----
    +
    +		final AtomicReference<Throwable> error = new AtomicReference<>();
    +		int fetchTasks = 5;
    +		final CountDownLatch latch = new CountDownLatch(fetchTasks);
    +		ExecutorService service = Executors.newFixedThreadPool(fetchTasks + 1);
    +
    +		service.submit(new Thread("fetcher runner") {
    +			@Override
    +			public void run() {
    +				try {
    +					latch.await();
    +					fetcher.runFetchLoop();
    --- End diff --
    
    The sequence here seems a bit odd to me.
    
    I think we should be testing this as follows:
    1. Run the fetch loop, and let it be blocked on record emitting (which then should let it be blocked mid-iteration)
    2. Add a discovered partition; this should not throw an exception.


---

[GitHub] flink issue #6040: [FLINK-9349][Kafka Connector] KafkaConnector Exception wh...

Posted by snuyanzin <gi...@git.apache.org>.
Github user snuyanzin commented on the issue:

    https://github.com/apache/flink/pull/6040
  
    @tzulitai, @tedyu thnk you for your review, comments and contribution tips
    I did updates which includes moving test into AbstractFetcherTest and making it kafka connector version independent
    
    Could you please help me a bit?
    Suddenly the travis build failed on YARNSessionCapacitySchedulerITCase (only on flink travis, on my fork it passed several times).  It does not look like result of changes as there is nothing related to yarn. Anyway I tried to investigate it. I found several similar issues on jira however they are closed. 
    
    Also I downloaded logs mentioned in failed travis job 
    
    > Uploading to transfer.sh
    https://transfer.sh/JspTz/24547.10.tar.gz
    
    based on them it looks like there was a connectivity issue with one of the ApplicationMaster 
    as log yarn-tests/container_1526608500321_0007_01_000001/job-manager.log is full of
    > 2018-05-18 01:56:49,448 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [null] failed with java.net.ConnectException: Connection refused: travis-job-2a2afdc5-7bf8-4597-946e-16551a5ebbc4/127.0.1.1:43980
    2018-05-18 01:56:49,449 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@travis-job-2a2afdc5-7bf8-4597-946e-16551a5ebbc4:43980] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink@travis-job-2a2afdc5-7bf8-4597-946e-16551a5ebbc4:43980]] Caused by: [Connection refused: travis-job-2a2afdc5-7bf8-4597-946e-16551a5ebbc4/127.0.1.1:43980]
    
    very strange thing 
    
    > Remote connection to [null] 



---

[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...

Posted by snuyanzin <gi...@git.apache.org>.
Github user snuyanzin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6040#discussion_r189168596
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Flink9349Test.java ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka.internal;
    +
    +import org.apache.flink.api.common.serialization.SimpleStringSchema;
    +import org.apache.flink.core.testutils.MultiShotLatch;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
    +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
    +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
    +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
    +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
    +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
    +
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +import org.powermock.core.classloader.annotations.PrepareForTest;
    +import org.powermock.modules.junit4.PowerMockRunner;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.junit.Assert.assertFalse;
    +import static org.mockito.Mockito.anyLong;
    +import static org.powermock.api.mockito.PowerMockito.doAnswer;
    +import static org.powermock.api.mockito.PowerMockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +import static org.powermock.api.mockito.PowerMockito.whenNew;
    +
    +/**
    + * Unit tests for the {@link Flink9349Test}.
    + */
    +@RunWith(PowerMockRunner.class)
    +@PrepareForTest(KafkaConsumerThread.class)
    +public class Flink9349Test {
    +	@Test
    +	public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception {
    +
    +		// test data
    +		final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
    +		final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
    +		testCommitData.put(testPartition, 11L);
    +
    +		// to synchronize when the consumer is in its blocking method
    +		final OneShotLatch sync = new OneShotLatch();
    +
    +		// ----- the mock consumer with blocking poll calls ----
    +		final MultiShotLatch blockerLatch = new MultiShotLatch();
    +
    +		KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
    +		when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
    +
    +			@Override
    +			public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
    +				sync.trigger();
    +				blockerLatch.await();
    +				return ConsumerRecords.empty();
    +			}
    +		});
    +
    +		doAnswer(new Answer<Void>() {
    +			@Override
    +			public Void answer(InvocationOnMock invocation) {
    +				blockerLatch.trigger();
    +				return null;
    +			}
    +		}).when(mockConsumer).wakeup();
    +
    +		// make sure the fetcher creates the mock consumer
    +		whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
    +
    +		// ----- create the test fetcher -----
    +
    +		@SuppressWarnings("unchecked")
    +		SourceFunction.SourceContext<String> sourceContext = mock(SourceFunction.SourceContext.class);
    +		Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
    +			Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
    +		KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
    +
    +		final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
    +			sourceContext,
    +			partitionsWithInitialOffsets,
    +			null, /* periodic watermark extractor */
    +			null, /* punctuated watermark extractor */
    +			new TestProcessingTimeService(),
    +			10, /* watermark interval */
    +			this.getClass().getClassLoader(),
    +			"task_name",
    +			schema,
    +			new Properties(),
    +			0L,
    +			new UnregisteredMetricsGroup(),
    +			new UnregisteredMetricsGroup(),
    +			false);
    +
    +		// ----- run the fetcher -----
    +
    +		final AtomicReference<Throwable> error = new AtomicReference<>();
    +		int fetchTasks = 2;
    +		final CountDownLatch latch = new CountDownLatch(fetchTasks);
    +		ExecutorService service = Executors.newFixedThreadPool(fetchTasks + 1);
    +
    +		service.submit(new Thread("fetcher runner ") {
    +			@Override
    +			public void run() {
    +				try {
    +					latch.await();
    +					fetcher.runFetchLoop();
    +				} catch (Throwable t) {
    +					error.set(t);
    +				}
    +			}
    +		});
    +		for (int i = 0; i < fetchTasks; i++) {
    +			service.submit(new Thread("add partitions " + i) {
    +
    +				@Override
    +				public void run() {
    +					try {
    +						List<KafkaTopicPartition> newPartitions = new ArrayList<>();
    +						for (int i = 0; i < 1000; i++) {
    +							newPartitions.add(testPartition);
    +						}
    +						fetcher.addDiscoveredPartitions(newPartitions);
    +						latch.countDown();
    +						//latch.await();
    +						for (int i = 0; i < 100; i++) {
    +							fetcher.addDiscoveredPartitions(newPartitions);
    +							Thread.sleep(1L);
    +						}
    +					} catch (Throwable t) {
    +						error.set(t);
    +					}
    +				}
    +			});
    +		}
    +
    +		service.awaitTermination(1L, TimeUnit.SECONDS);
    +
    +		// wait until the fetcher has reached the method of interest
    +		sync.await();
    +
    +		// ----- trigger the offset commit -----
    --- End diff --
    
    thank you 
    you are right I removed it and some other useless stuff from this test


---

[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6040#discussion_r190111529
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java ---
    @@ -416,9 +520,16 @@ protected TestFetcher(
     				false);
     		}
     
    +		/**
    +		 * Emulation of partition's iteration which is required for
    +		 * {@link AbstractFetcherTest#testConcurrentPartitionsDiscoveryAndLoopFetching}.
    +		 * @throws Exception
    +		 */
     		@Override
     		public void runFetchLoop() throws Exception {
    -			throw new UnsupportedOperationException();
    +			for (KafkaTopicPartitionState ignored: subscribedPartitionStates()) {
    +				Thread.sleep(10L);
    --- End diff --
    
    This would only let the test fail "occasionally", right?
    I would like this to be changed, so that we always have the test failing without the copy on write fix.
    We could do this by having a dummy source context that blocks on record emit.


---

[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/6040


---

[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6040#discussion_r189031464
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Flink9349Test.java ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka.internal;
    +
    +import org.apache.flink.api.common.serialization.SimpleStringSchema;
    +import org.apache.flink.core.testutils.MultiShotLatch;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
    +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
    +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
    +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
    +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
    +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
    +
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +import org.powermock.core.classloader.annotations.PrepareForTest;
    +import org.powermock.modules.junit4.PowerMockRunner;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.junit.Assert.assertFalse;
    +import static org.mockito.Mockito.anyLong;
    +import static org.powermock.api.mockito.PowerMockito.doAnswer;
    +import static org.powermock.api.mockito.PowerMockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +import static org.powermock.api.mockito.PowerMockito.whenNew;
    +
    +/**
    + * Unit tests for the {@link Flink9349Test}.
    + */
    +@RunWith(PowerMockRunner.class)
    +@PrepareForTest(KafkaConsumerThread.class)
    +public class Flink9349Test {
    +	@Test
    +	public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception {
    +
    +		// test data
    +		final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
    +		final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
    +		testCommitData.put(testPartition, 11L);
    +
    +		// to synchronize when the consumer is in its blocking method
    +		final OneShotLatch sync = new OneShotLatch();
    +
    +		// ----- the mock consumer with blocking poll calls ----
    +		final MultiShotLatch blockerLatch = new MultiShotLatch();
    +
    +		KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
    +		when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
    +
    +			@Override
    +			public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
    +				sync.trigger();
    +				blockerLatch.await();
    +				return ConsumerRecords.empty();
    +			}
    +		});
    +
    +		doAnswer(new Answer<Void>() {
    +			@Override
    +			public Void answer(InvocationOnMock invocation) {
    +				blockerLatch.trigger();
    +				return null;
    +			}
    +		}).when(mockConsumer).wakeup();
    +
    +		// make sure the fetcher creates the mock consumer
    +		whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
    +
    +		// ----- create the test fetcher -----
    +
    +		@SuppressWarnings("unchecked")
    +		SourceFunction.SourceContext<String> sourceContext = mock(SourceFunction.SourceContext.class);
    +		Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
    +			Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
    +		KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
    +
    +		final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
    +			sourceContext,
    +			partitionsWithInitialOffsets,
    +			null, /* periodic watermark extractor */
    +			null, /* punctuated watermark extractor */
    +			new TestProcessingTimeService(),
    +			10, /* watermark interval */
    +			this.getClass().getClassLoader(),
    +			"task_name",
    +			schema,
    +			new Properties(),
    +			0L,
    +			new UnregisteredMetricsGroup(),
    +			new UnregisteredMetricsGroup(),
    +			false);
    +
    +		// ----- run the fetcher -----
    +
    +		final AtomicReference<Throwable> error = new AtomicReference<>();
    +		int fetchTasks = 2;
    +		final CountDownLatch latch = new CountDownLatch(fetchTasks);
    +		ExecutorService service = Executors.newFixedThreadPool(fetchTasks + 1);
    +
    +		service.submit(new Thread("fetcher runner ") {
    +			@Override
    +			public void run() {
    +				try {
    +					latch.await();
    +					fetcher.runFetchLoop();
    +				} catch (Throwable t) {
    +					error.set(t);
    +				}
    +			}
    +		});
    +		for (int i = 0; i < fetchTasks; i++) {
    +			service.submit(new Thread("add partitions " + i) {
    +
    +				@Override
    +				public void run() {
    +					try {
    +						List<KafkaTopicPartition> newPartitions = new ArrayList<>();
    +						for (int i = 0; i < 1000; i++) {
    +							newPartitions.add(testPartition);
    +						}
    +						fetcher.addDiscoveredPartitions(newPartitions);
    +						latch.countDown();
    +						//latch.await();
    +						for (int i = 0; i < 100; i++) {
    +							fetcher.addDiscoveredPartitions(newPartitions);
    +							Thread.sleep(1L);
    +						}
    +					} catch (Throwable t) {
    +						error.set(t);
    +					}
    +				}
    +			});
    +		}
    +
    +		service.awaitTermination(1L, TimeUnit.SECONDS);
    +
    +		// wait until the fetcher has reached the method of interest
    +		sync.await();
    +
    +		// ----- trigger the offset commit -----
    --- End diff --
    
    I don't think this is required for the scope of interest of this test, is it?


---

[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...

Posted by tedyu <gi...@git.apache.org>.
Github user tedyu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6040#discussion_r189036753
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java ---
    @@ -507,7 +507,7 @@ private void updateMinPunctuatedWatermark(Watermark nextWatermark) {
     			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
     			ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {
     
    -		List<KafkaTopicPartitionState<KPH>> partitionStates = new LinkedList<>();
    +		List<KafkaTopicPartitionState<KPH>> partitionStates = new CopyOnWriteArrayList<>();
    --- End diff --
    
    explanation can be made with a comment.
    No need to link to issue comment.


---

[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6040#discussion_r189031751
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Flink9349Test.java ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka.internal;
    +
    +import org.apache.flink.api.common.serialization.SimpleStringSchema;
    +import org.apache.flink.core.testutils.MultiShotLatch;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
    +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
    +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
    +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
    +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
    +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
    +
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +import org.powermock.core.classloader.annotations.PrepareForTest;
    +import org.powermock.modules.junit4.PowerMockRunner;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.junit.Assert.assertFalse;
    +import static org.mockito.Mockito.anyLong;
    +import static org.powermock.api.mockito.PowerMockito.doAnswer;
    +import static org.powermock.api.mockito.PowerMockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +import static org.powermock.api.mockito.PowerMockito.whenNew;
    +
    +/**
    + * Unit tests for the {@link Flink9349Test}.
    + */
    +@RunWith(PowerMockRunner.class)
    +@PrepareForTest(KafkaConsumerThread.class)
    +public class Flink9349Test {
    +	@Test
    +	public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception {
    --- End diff --
    
    Likewise, Kafka 08 / 09 / 010 / 011 should all have this test coverage.


---

[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...

Posted by snuyanzin <gi...@git.apache.org>.
Github user snuyanzin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6040#discussion_r189035401
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java ---
    @@ -507,7 +507,7 @@ private void updateMinPunctuatedWatermark(Watermark nextWatermark) {
     			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
     			ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {
     
    -		List<KafkaTopicPartitionState<KPH>> partitionStates = new LinkedList<>();
    +		List<KafkaTopicPartitionState<KPH>> partitionStates = new CopyOnWriteArrayList<>();
    --- End diff --
    
    Yes you are right. A question: is it allowed to specify a link on the issue comment where it was decided to use CopyOnWriteArrayList? Or is it better to have explanation in a comment only?


---

[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6040#discussion_r190111239
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java ---
    @@ -390,6 +398,102 @@ public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma
     		assertEquals(100, sourceContext.getLatestWatermark().getTimestamp());
     	}
     
    +	@Test
    +	public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception {
    +		// test data
    +		final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
    +
    +		final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
    +		testCommitData.put(testPartition, 11L);
    +
    +		// ----- create the test fetcher -----
    +
    +		@SuppressWarnings("unchecked")
    +		SourceContext<String> sourceContext = PowerMockito.mock(SourceContext.class);
    +		Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
    +			Collections.singletonMap(testPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
    +
    +		final TestFetcher<String> fetcher = new TestFetcher<>(
    +			sourceContext,
    +			partitionsWithInitialOffsets,
    +			null, /* periodic assigner */
    +			null, /* punctuated assigner */
    +			new TestProcessingTimeService(),
    +			10);
    +
    +		// ----- run the fetcher -----
    +
    +		final AtomicReference<Throwable> error = new AtomicReference<>();
    +		int fetchTasks = 5;
    +		final CountDownLatch latch = new CountDownLatch(fetchTasks);
    +		ExecutorService service = Executors.newFixedThreadPool(fetchTasks + 1);
    +
    +		service.submit(new Thread("fetcher runner") {
    +			@Override
    +			public void run() {
    +				try {
    +					latch.await();
    +					fetcher.runFetchLoop();
    +				} catch (Throwable t) {
    +					error.set(t);
    +				}
    +			}
    +		});
    +
    +		for (int i = 0; i < fetchTasks; i++) {
    +			service.submit(new Thread("add partitions " + i) {
    +				@Override
    +				public void run() {
    +					try {
    +						List<KafkaTopicPartition> newPartitions = new ArrayList<>();
    +						for (int i = 0; i < 1000; i++) {
    +							newPartitions.add(testPartition);
    +						}
    +						fetcher.addDiscoveredPartitions(newPartitions);
    +						latch.countDown();
    +						for (int i = 0; i < 100; i++) {
    +							fetcher.addDiscoveredPartitions(newPartitions);
    +							Thread.sleep(1L);
    +						}
    +					} catch (Throwable t) {
    +						error.set(t);
    +					}
    +				}
    +			});
    +		}
    +
    +		service.awaitTermination(1L, TimeUnit.SECONDS);
    +
    +		// ----- trigger the offset commit -----
    --- End diff --
    
    We should be able to ignore offset commit triggering in this test


---

[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...

Posted by snuyanzin <gi...@git.apache.org>.
Github user snuyanzin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6040#discussion_r189038623
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Flink9349Test.java ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka.internal;
    +
    +import org.apache.flink.api.common.serialization.SimpleStringSchema;
    +import org.apache.flink.core.testutils.MultiShotLatch;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
    +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
    +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
    +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
    +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
    +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
    +
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +import org.powermock.core.classloader.annotations.PrepareForTest;
    +import org.powermock.modules.junit4.PowerMockRunner;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.junit.Assert.assertFalse;
    +import static org.mockito.Mockito.anyLong;
    +import static org.powermock.api.mockito.PowerMockito.doAnswer;
    +import static org.powermock.api.mockito.PowerMockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +import static org.powermock.api.mockito.PowerMockito.whenNew;
    +
    +/**
    + * Unit tests for the {@link Flink9349Test}.
    + */
    +@RunWith(PowerMockRunner.class)
    +@PrepareForTest(KafkaConsumerThread.class)
    +public class Flink9349Test {
    +	@Test
    +	public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception {
    --- End diff --
    
    Do you mean to have such test as a separate method in each Kafka KafkaXYFetcherTest class? 



---

[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6040#discussion_r190111078
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java ---
    @@ -390,6 +398,102 @@ public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma
     		assertEquals(100, sourceContext.getLatestWatermark().getTimestamp());
     	}
     
    +	@Test
    +	public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception {
    +		// test data
    +		final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
    +
    +		final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
    +		testCommitData.put(testPartition, 11L);
    +
    +		// ----- create the test fetcher -----
    +
    +		@SuppressWarnings("unchecked")
    +		SourceContext<String> sourceContext = PowerMockito.mock(SourceContext.class);
    +		Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
    +			Collections.singletonMap(testPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
    +
    +		final TestFetcher<String> fetcher = new TestFetcher<>(
    +			sourceContext,
    +			partitionsWithInitialOffsets,
    +			null, /* periodic assigner */
    +			null, /* punctuated assigner */
    +			new TestProcessingTimeService(),
    +			10);
    +
    +		// ----- run the fetcher -----
    +
    +		final AtomicReference<Throwable> error = new AtomicReference<>();
    --- End diff --
    
    Flink provides a `CheckedThread` utility so you don't have to do this thread error referencing.


---

[GitHub] flink issue #6040: [FLINK-9349][Kafka Connector] KafkaConnector Exception wh...

Posted by snuyanzin <gi...@git.apache.org>.
Github user snuyanzin commented on the issue:

    https://github.com/apache/flink/pull/6040
  
    @tzulitai thank you for your review and comments
    based on your comments I have a question. Could you please clarify it?
    
    You mentioned Flink's `OneShotLatch ` and `CheckedThread ` at the same time in some Kafka connector's tests used `AtomicReference`, `Thread` and etc. (I used one of them as an example while writing my version of the test). Just to be on the sage am I right that `OneShotLatch ` and `CheckedThread ` in tests are more preferable or are there some rules/limitations/whatever?


---

[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6040#discussion_r190120134
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java ---
    @@ -390,6 +398,102 @@ public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma
     		assertEquals(100, sourceContext.getLatestWatermark().getTimestamp());
     	}
     
    +	@Test
    +	public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception {
    +		// test data
    +		final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
    +
    +		final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
    +		testCommitData.put(testPartition, 11L);
    +
    +		// ----- create the test fetcher -----
    +
    +		@SuppressWarnings("unchecked")
    +		SourceContext<String> sourceContext = PowerMockito.mock(SourceContext.class);
    +		Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
    +			Collections.singletonMap(testPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
    +
    +		final TestFetcher<String> fetcher = new TestFetcher<>(
    +			sourceContext,
    +			partitionsWithInitialOffsets,
    +			null, /* periodic assigner */
    +			null, /* punctuated assigner */
    +			new TestProcessingTimeService(),
    +			10);
    +
    +		// ----- run the fetcher -----
    +
    +		final AtomicReference<Throwable> error = new AtomicReference<>();
    +		int fetchTasks = 5;
    +		final CountDownLatch latch = new CountDownLatch(fetchTasks);
    +		ExecutorService service = Executors.newFixedThreadPool(fetchTasks + 1);
    +
    +		service.submit(new Thread("fetcher runner") {
    +			@Override
    +			public void run() {
    +				try {
    +					latch.await();
    +					fetcher.runFetchLoop();
    --- End diff --
    
    So, IMO, the test should look something like this:
    
    ```
    		final OneShotLatch fetchLoopWaitLatch = new OneShotLatch();
    		final OneShotLatch stateIterationBlockLatch = new OneShotLatch();
    
    		final TestFetcher<String> fetcher = new TestFetcher<>(
    			sourceContext,
    			partitionsWithInitialOffsets,
    			null, /* periodic assigner */
    			null, /* punctuated assigner */
    			new TestProcessingTimeService(),
    			10,
    			fetchLoopWaitLatch,
    			stateIterationBlockLatch);
    
    		// ----- run the fetcher -----
    
    		final CheckedThread checkedThread = new CheckedThread() {
    			@Override
    			public void go() throws Exception {
    				fetcher.runFetchLoop();
    			}
    		};
    		checkedThread.start();
    
    		// wait until state iteration begins before adding discovered partitions
    		fetchLoopWaitLatch.await();
    		fetcher.addDiscoveredPartitions(Collections.singletonList(testPartition));
    
    		stateIterationBlockLatch.trigger();
    		checkedThread.sync();
    ```


---

[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6040#discussion_r190112933
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java ---
    @@ -507,7 +507,11 @@ private void updateMinPunctuatedWatermark(Watermark nextWatermark) {
     			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
     			ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {
     
    -		List<KafkaTopicPartitionState<KPH>> partitionStates = new LinkedList<>();
    +		/**
    +		 *  CopyOnWrite as adding discovered partitions could happen in parallel
    +		 *  with different threads iterating by {@link AbstractFetcher#subscribedPartitionStates} results
    +		 */
    --- End diff --
    
    I think we usually don't have Javadoc blocks within methods. A regular comment with `//` would do.


---

[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6040#discussion_r189031075
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Flink9349Test.java ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka.internal;
    +
    +import org.apache.flink.api.common.serialization.SimpleStringSchema;
    +import org.apache.flink.core.testutils.MultiShotLatch;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
    +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
    +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
    +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
    +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
    +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
    +
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +import org.powermock.core.classloader.annotations.PrepareForTest;
    +import org.powermock.modules.junit4.PowerMockRunner;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.junit.Assert.assertFalse;
    +import static org.mockito.Mockito.anyLong;
    +import static org.powermock.api.mockito.PowerMockito.doAnswer;
    +import static org.powermock.api.mockito.PowerMockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +import static org.powermock.api.mockito.PowerMockito.whenNew;
    +
    +/**
    + * Unit tests for the {@link Flink9349Test}.
    + */
    +@RunWith(PowerMockRunner.class)
    +@PrepareForTest(KafkaConsumerThread.class)
    +public class Flink9349Test {
    +	@Test
    +	public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception {
    --- End diff --
    
    I think we should have a similar test, but move it to `Kafka09FetcherTest`.


---

[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6040#discussion_r189029077
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java ---
    @@ -507,7 +507,7 @@ private void updateMinPunctuatedWatermark(Watermark nextWatermark) {
     			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
     			ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {
     
    -		List<KafkaTopicPartitionState<KPH>> partitionStates = new LinkedList<>();
    +		List<KafkaTopicPartitionState<KPH>> partitionStates = new CopyOnWriteArrayList<>();
    --- End diff --
    
    Would be nice to have a comment on why we need to use a `CopyOnWriteArrayList`


---

[GitHub] flink issue #6040: [FLINK-9349][Kafka Connector] KafkaConnector Exception wh...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/6040
  
    Thanks for the PR @snuyanzin!
    I had some comments, please let me know what you think.
    
    Also, some general contribution tips:
    1. I would suggest the title of the PR to be something along the lines of "[FLINK-9349] [kafka] Fix ConcurrentModificationException when add discovered partitions". That directly makes it clear what exactly is being fixed.
    2. The message of the first commit of the PR should also be appropriately set to be similar to the title (most of the time if it is a 1-commit PR, the title of the PR and the commit message can be identical).


---

[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6040#discussion_r190120209
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java ---
    @@ -390,6 +398,102 @@ public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma
     		assertEquals(100, sourceContext.getLatestWatermark().getTimestamp());
     	}
     
    +	@Test
    +	public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception {
    +		// test data
    +		final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
    +
    +		final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
    +		testCommitData.put(testPartition, 11L);
    +
    +		// ----- create the test fetcher -----
    +
    +		@SuppressWarnings("unchecked")
    +		SourceContext<String> sourceContext = PowerMockito.mock(SourceContext.class);
    +		Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
    +			Collections.singletonMap(testPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
    +
    +		final TestFetcher<String> fetcher = new TestFetcher<>(
    +			sourceContext,
    +			partitionsWithInitialOffsets,
    +			null, /* periodic assigner */
    +			null, /* punctuated assigner */
    +			new TestProcessingTimeService(),
    +			10);
    +
    +		// ----- run the fetcher -----
    +
    +		final AtomicReference<Throwable> error = new AtomicReference<>();
    +		int fetchTasks = 5;
    +		final CountDownLatch latch = new CountDownLatch(fetchTasks);
    +		ExecutorService service = Executors.newFixedThreadPool(fetchTasks + 1);
    +
    +		service.submit(new Thread("fetcher runner") {
    +			@Override
    +			public void run() {
    +				try {
    +					latch.await();
    +					fetcher.runFetchLoop();
    --- End diff --
    
    The final `checkedThread.sync()` would always fail with the `ConcurrentModificationException` if the test is designed like this.


---