You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tzulitai <gi...@git.apache.org> on 2017/04/20 15:26:39 UTC

[GitHub] flink pull request #3746: [FLINK-4022] [kafka] Partition and topic discovery...

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/3746

    [FLINK-4022] [kafka] Partition and topic discovery for FlinkKafkaConsumer

    This PR adds the required internals to allow partition and topic regex pattern discovery in the `FlinkKafkaConsumer`.
    
    It doesn't expose a new constructor that accepts regex topic patterns yet. I propose to expose that with https://issues.apache.org/jira/browse/FLINK-5704 (deprecate the original FlinkKafkaConsumer constructors in favor of new ones with new offset behaviours). For this reason, I also propose to update the Kafka documentation when the new constructors are added.
    
    ## Design
    
    Some description to ease review:
    
    - `AbstractPartitionDiscoverer`:
    An `AbstractPartitionDiscoverer` is a stateful utility instance that remembers what partitions are discovered already. It also wraps the logic for partition-to-subtask assignment. The main `run()` method now has a discovery loop that calls `AbstractPartitionDiscoverer#discoverPartitions()` on a fixed interval. This method returns only new partitions that should be subscribed by the subtask.
    The returned partitions are used to invoke `AbstractFetcher#addDiscoveredPartitions(...)` on the fetcher.
    On a fresh startup, `AbstractPartitionDiscoverer#discoverPartitions()` is also used to fetch the initial seed startup partitions in `open()`.
    
    - `AbstractFetcher#addDiscoveredPartitions(...)`
    The fetcher now has an `unassignedPartitionsQueue` that contains discovered partitions not yet assigned to Kafka consumers to be read. Whenever `addDiscoveredPartitions(...)` is called on the fetcher, the fetcher will create the state holders for the partitions, and add the partitions to the queue.
    Concrete implementations of the fetcher should continuously poll this queue in the fetch loop. If partitions are found from the queue, they should be assigned for consuming.
    
    - Concrete fetchers continuously polls the queue in `runFetchLoop()`
    For 0.8, this simply means that the original `unassignedPartitionsQueue` in Kafka08Fetcher is moved to the base abstract fetcher class. Nothing else is touched.
    For 0.9+, queue polling and partition reassignment for the high-level consumer happens in `KafkaConsumerThread`.
    
    ## Limitations
    
    For the partition discovery to work properly after restores, the previous state had to be migrated to use union list states instead (`OperatorStateStore#getUnionListState()`).
    
    One limitation that comes with this migration is that if the current run was restored from old state (Flink 1.1 / 1.2), then partition discovery is disabled. In order to use partition discovery, the user must do a manual snapshot (with the new union state) & restore.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-4022

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3746.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3746
    
----
commit 67c3b872cc0129b2f87b565e9fb92908926eac64
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Date:   2017-04-20T09:17:43Z

    [FLINK-4022] [kafka] Partition / topic discovery for FlinkKafkaConsumer

commit bf2dd78e7706b9aae08b75ff917878e3f2ceb68b
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Date:   2017-04-20T15:08:39Z

    [FLINK-4022] Migrate to union list state

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3746: [FLINK-4022] [kafka] Partition and topic discovery...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3746#discussion_r114795536
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java ---
    @@ -219,6 +209,15 @@ public void run() {
     					}
     				}
     
    +				try {
    +					newPartitions = unassignedPartitionsQueue.pollBatch();
    +					if (newPartitions != null) {
    +						reassignPartitions(newPartitions);
    +					}
    +				} catch (AbortedReassignmentException e) {
    +					continue;
    --- End diff --
    
    the reason is that the `AbortedReassignmentException` is thrown only if the Kafka consumer was `wakeup` before the reassignment started and that wakeup effect was consumed by an operation during the reassignment.
    
    Now, if the loop just continues, the next Kafka operation (the `poll`) would not be woken up because the wakeup was already consumed.
    So, effectively, it would mean the same thing to do this in the catch block instead of falling through the loop:
    ```
    } catch (AbortedReassignmentException e) {
    consumer.wakeup();
    }
    ```
    
    I'm just eagerly continuing instead of restoring the wakeup. Does that make sense?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3746: [FLINK-4022] [kafka] Partition and topic discovery for Fl...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3746
  
    @haohui I can not guarantee this, but I can try revisiting this for a workaround after I've done some testing on the current RC1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3746: [FLINK-4022] [kafka] Partition and topic discovery...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3746#discussion_r114803295
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java ---
    @@ -219,6 +209,15 @@ public void run() {
     					}
     				}
     
    +				try {
    +					newPartitions = unassignedPartitionsQueue.pollBatch();
    +					if (newPartitions != null) {
    +						reassignPartitions(newPartitions);
    +					}
    +				} catch (AbortedReassignmentException e) {
    +					continue;
    --- End diff --
    
    One extra remark:
    
    the external `wakeup` call was intended to un-block the main thread from any Kafka blocking operation, and commit offsets as soon as possible (which happens at the start of the loop). It's also for making sure that the main thread stops work as soon as possible so that there isn't any strange long job shutdown issues (which occurred before).
    
    Hence it's important to restore effective wakeup calls (or mimic their behaviour) and fall through the loop when an external operation did call wakeup.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3746: [FLINK-4022] [kafka] Partition and topic discovery...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3746#discussion_r114786022
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java ---
    @@ -0,0 +1,842 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka.internal;
    +
    +import org.apache.flink.core.testutils.MultiShotLatch;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.metrics.MetricGroup;
    +import org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue;
    +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
    +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
    +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.kafka.common.errors.WakeupException;
    +import org.junit.Test;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +import org.slf4j.Logger;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.mockito.Matchers.any;
    +import static org.mockito.Matchers.anyListOf;
    +import static org.mockito.Matchers.anyLong;
    +import static org.mockito.Mockito.doAnswer;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.never;
    +import static org.mockito.Mockito.times;
    +import static org.mockito.Mockito.verify;
    +import static org.mockito.Mockito.when;
    +
    +/**
    + * Unit tests for the {@link KafkaConsumerThread}.
    + */
    +public class KafkaConsumerThreadTest {
    +
    +	/**
    +	 * Tests reassignment works correctly in the case when:
    +	 *  - the consumer initially had no assignments
    +	 *  - new unassigned partitions already have defined offsets
    +	 *
    +	 * Setting a timeout because the test will not finish if there is logic error with
    +	 * the reassignment flow.
    +	 */
    +	@SuppressWarnings("unchecked")
    +	@Test(timeout = 10000)
    +	public void testReassigningPartitionsWithDefinedOffsetsWhenNoInitialAssignment() throws Exception {
    +		final String testTopic = "test-topic";
    +
    +		// -------- new partitions with defined offsets --------
    +
    +		KafkaTopicPartitionState<TopicPartition> newPartition1 = new KafkaTopicPartitionState<>(
    +			new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0));
    +		newPartition1.setOffset(23L);
    +
    +		KafkaTopicPartitionState<TopicPartition> newPartition2 = new KafkaTopicPartitionState<>(
    +			new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1));
    +		newPartition2.setOffset(31L);
    +
    +		final List<KafkaTopicPartitionState<TopicPartition>> newPartitions = new ArrayList<>(2);
    +		newPartitions.add(newPartition1);
    +		newPartitions.add(newPartition2);
    +
    +		// -------- setup mock KafkaConsumer --------
    +
    +		// no initial assignment
    +		final Map<TopicPartition, Long> mockConsumerAssignmentsAndPositions = new LinkedHashMap<>();
    +
    +		final KafkaConsumer<byte[], byte[]> mockConsumer = createMockConsumer(
    +				mockConsumerAssignmentsAndPositions,
    +				Collections.<TopicPartition, Long>emptyMap(),
    +				false,
    +				null,
    +				null);
    +
    +		// -------- setup new partitions to be polled from the unassigned partitions queue --------
    +
    +		final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue =
    +			new ClosableBlockingQueue<>();
    +
    +		for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) {
    +			unassignedPartitionsQueue.add(newPartition);
    +		}
    +
    +		// -------- start test --------
    +
    +		final TestKafkaConsumerThread testThread =
    +				new TestKafkaConsumerThread(mockConsumer, unassignedPartitionsQueue, new Handover());
    +		testThread.start();
    +
    +		testThread.startPartitionReassignment();
    +		testThread.waitPartitionReassignmentComplete();
    +
    +		// verify that the consumer called assign() with all new partitions, and that positions are correctly advanced
    +
    +		assertEquals(newPartitions.size(), mockConsumerAssignmentsAndPositions.size());
    +
    +		for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) {
    +			assertTrue(mockConsumerAssignmentsAndPositions.containsKey(newPartition.getKafkaPartitionHandle()));
    +
    +			// should be seeked to (offset in state + 1) because offsets in state represent the last processed record
    +			assertEquals(
    +					newPartition.getOffset() + 1,
    +					mockConsumerAssignmentsAndPositions.get(newPartition.getKafkaPartitionHandle()).longValue());
    +		}
    +
    +		assertEquals(0, unassignedPartitionsQueue.size());
    +	}
    +
    +	/**
    +	 * Tests reassignment works correctly in the case when:
    +	 *  - the consumer initially had no assignments
    +	 *  - new unassigned partitions have undefined offsets (e.g. EARLIEST_OFFSET sentinel value)
    +	 *
    +	 * Setting a timeout because the test will not finish if there is logic error with
    +	 * the reassignment flow.
    +	 */
    +	@SuppressWarnings("unchecked")
    +	@Test(timeout = 10000)
    +	public void testReassigningPartitionsWithoutDefinedOffsetsWhenNoInitialAssignment() throws Exception {
    +		final String testTopic = "test-topic";
    +
    +		// -------- new partitions with undefined offsets --------
    +
    +		KafkaTopicPartitionState<TopicPartition> newPartition1 = new KafkaTopicPartitionState<>(
    +			new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0));
    +		newPartition1.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
    +
    +		KafkaTopicPartitionState<TopicPartition> newPartition2 = new KafkaTopicPartitionState<>(
    +			new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1));
    +		newPartition2.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
    +
    +		final List<KafkaTopicPartitionState<TopicPartition>> newPartitions = new ArrayList<>(2);
    +		newPartitions.add(newPartition1);
    +		newPartitions.add(newPartition2);
    +
    +		// -------- setup mock KafkaConsumer --------
    +
    +		// no initial assignment
    +		final Map<TopicPartition, Long> mockConsumerAssignmentsAndPositions = new LinkedHashMap<>();
    +
    +		// mock retrieved values that should replace the EARLIEST_OFFSET sentinels
    +		final Map<TopicPartition, Long> mockRetrievedPositions = new HashMap<>();
    +		mockRetrievedPositions.put(newPartition1.getKafkaPartitionHandle(), 23L);
    +		mockRetrievedPositions.put(newPartition2.getKafkaPartitionHandle(), 32L);
    +
    +		final KafkaConsumer<byte[], byte[]> mockConsumer = createMockConsumer(
    +				mockConsumerAssignmentsAndPositions,
    +				mockRetrievedPositions,
    +				false,
    +				null,
    +				null);
    +
    +		// -------- setup new partitions to be polled from the unassigned partitions queue --------
    +
    +		final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue =
    +			new ClosableBlockingQueue<>();
    +
    +		for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) {
    +			unassignedPartitionsQueue.add(newPartition);
    +		}
    +
    +		// -------- start test --------
    +
    +		final TestKafkaConsumerThread testThread =
    +			new TestKafkaConsumerThread(mockConsumer, unassignedPartitionsQueue, new Handover());
    +		testThread.start();
    +
    +		testThread.startPartitionReassignment();
    +		testThread.waitPartitionReassignmentComplete();
    +
    +		// the sentinel offset states should have been replaced with defined values according to the retrieved values
    +		assertEquals(mockRetrievedPositions.get(newPartition1.getKafkaPartitionHandle()) - 1, newPartition1.getOffset());
    +		assertEquals(mockRetrievedPositions.get(newPartition2.getKafkaPartitionHandle()) - 1, newPartition2.getOffset());
    +
    +		// verify that the consumer called assign() with all new partitions, and that positions are correctly advanced
    +
    +		assertEquals(newPartitions.size(), mockConsumerAssignmentsAndPositions.size());
    +
    +		for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) {
    +			assertTrue(mockConsumerAssignmentsAndPositions.containsKey(newPartition.getKafkaPartitionHandle()));
    +
    +			// should be seeked to (offset in state + 1) because offsets in state represent the last processed record
    +			assertEquals(
    +					newPartition.getOffset() + 1,
    +					mockConsumerAssignmentsAndPositions.get(newPartition.getKafkaPartitionHandle()).longValue());
    +		}
    +
    +		assertEquals(0, unassignedPartitionsQueue.size());
    +	}
    +
    +	/**
    +	 * Tests reassignment works correctly in the case when:
    +	 *  - the consumer already have some assignments
    +	 *  - new unassigned partitions already have defined offsets
    +	 *
    +	 * Setting a timeout because the test will not finish if there is logic error with
    +	 * the reassignment flow.
    +	 */
    +	@SuppressWarnings("unchecked")
    +	@Test(timeout = 10000)
    +	public void testReassigningPartitionsWithDefinedOffsets() throws Exception {
    +		final String testTopic = "test-topic";
    +
    +		// -------- old partitions --------
    +
    +		KafkaTopicPartitionState<TopicPartition> oldPartition1 = new KafkaTopicPartitionState<>(
    +			new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0));
    +		oldPartition1.setOffset(23L);
    +
    +		KafkaTopicPartitionState<TopicPartition> oldPartition2 = new KafkaTopicPartitionState<>(
    +			new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1));
    +		oldPartition2.setOffset(32L);
    +
    +		List<KafkaTopicPartitionState<TopicPartition>> oldPartitions = new ArrayList<>(2);
    +		oldPartitions.add(oldPartition1);
    +		oldPartitions.add(oldPartition2);
    +
    +		// -------- new partitions with defined offsets --------
    +
    +		KafkaTopicPartitionState<TopicPartition> newPartition = new KafkaTopicPartitionState<>(
    +			new KafkaTopicPartition(testTopic, 2), new TopicPartition(testTopic, 2));
    +		newPartition.setOffset(29L);
    +
    +		List<KafkaTopicPartitionState<TopicPartition>> totalPartitions = new ArrayList<>(3);
    +		totalPartitions.add(oldPartition1);
    +		totalPartitions.add(oldPartition2);
    +		totalPartitions.add(newPartition);
    +
    +		// -------- setup mock KafkaConsumer --------
    +
    +		// has initial assignments
    +		final Map<TopicPartition, Long> mockConsumerAssignmentsAndPositions = new HashMap<>();
    +		for (KafkaTopicPartitionState<TopicPartition> oldPartition : oldPartitions) {
    +			mockConsumerAssignmentsAndPositions.put(oldPartition.getKafkaPartitionHandle(), oldPartition.getOffset() + 1);
    +		}
    +
    +		final KafkaConsumer<byte[], byte[]> mockConsumer = createMockConsumer(
    +				mockConsumerAssignmentsAndPositions,
    +				Collections.<TopicPartition, Long>emptyMap(),
    +				false,
    +				null,
    +				null);
    +
    +		// -------- setup new partitions to be polled from the unassigned partitions queue --------
    +
    +		final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue =
    +			new ClosableBlockingQueue<>();
    +
    +		unassignedPartitionsQueue.add(newPartition);
    +
    +		// -------- start test --------
    +
    +		final TestKafkaConsumerThread testThread =
    +			new TestKafkaConsumerThread(mockConsumer, unassignedPartitionsQueue, new Handover());
    +		testThread.start();
    +
    +		testThread.startPartitionReassignment();
    +		testThread.waitPartitionReassignmentComplete();
    +
    +		// verify that the consumer called assign() with all new partitions, and that positions are correctly advanced
    +
    +		assertEquals(totalPartitions.size(), mockConsumerAssignmentsAndPositions.size());
    +
    +		// old partitions should be re-seeked to their previous positions
    +		for (KafkaTopicPartitionState<TopicPartition> partition : totalPartitions) {
    +			assertTrue(mockConsumerAssignmentsAndPositions.containsKey(partition.getKafkaPartitionHandle()));
    +
    +			// should be seeked to (offset in state + 1) because offsets in state represent the last processed record
    +			assertEquals(
    +					partition.getOffset() + 1,
    +					mockConsumerAssignmentsAndPositions.get(partition.getKafkaPartitionHandle()).longValue());
    +		}
    +
    +		assertEquals(0, unassignedPartitionsQueue.size());
    +	}
    +
    +	/**
    +	 * Tests reassignment works correctly in the case when:
    +	 *  - the consumer already have some assignments
    +	 *  - new unassigned partitions have undefined offsets (e.g. EARLIEST_OFFSET sentinel value)
    +	 *
    +	 * Setting a timeout because the test will not finish if there is logic error with
    +	 * the reassignment flow.
    +	 */
    +	@SuppressWarnings("unchecked")
    +	@Test(timeout = 10000)
    +	public void testReassigningPartitionsWithoutDefinedOffsets() throws Exception {
    +		final String testTopic = "test-topic";
    +
    +		// -------- old partitions --------
    +
    +		KafkaTopicPartitionState<TopicPartition> oldPartition1 = new KafkaTopicPartitionState<>(
    +			new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0));
    +		oldPartition1.setOffset(23L);
    +
    +		KafkaTopicPartitionState<TopicPartition> oldPartition2 = new KafkaTopicPartitionState<>(
    +			new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1));
    +		oldPartition2.setOffset(32L);
    +
    +		List<KafkaTopicPartitionState<TopicPartition>> oldPartitions = new ArrayList<>(2);
    +		oldPartitions.add(oldPartition1);
    +		oldPartitions.add(oldPartition2);
    +
    +		// -------- new partitions with undefined offsets --------
    +
    +		KafkaTopicPartitionState<TopicPartition> newPartition = new KafkaTopicPartitionState<>(
    +			new KafkaTopicPartition(testTopic, 2), new TopicPartition(testTopic, 2));
    +		newPartition.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
    +
    +		List<KafkaTopicPartitionState<TopicPartition>> totalPartitions = new ArrayList<>(3);
    +		totalPartitions.add(oldPartition1);
    +		totalPartitions.add(oldPartition2);
    +		totalPartitions.add(newPartition);
    +
    +		// -------- setup mock KafkaConsumer --------
    +
    +		// has initial assignments
    +		final Map<TopicPartition, Long> mockConsumerAssignmentsAndPositions = new HashMap<>();
    +		for (KafkaTopicPartitionState<TopicPartition> oldPartition : oldPartitions) {
    +			mockConsumerAssignmentsAndPositions.put(oldPartition.getKafkaPartitionHandle(), oldPartition.getOffset() + 1);
    +		}
    +
    +		// mock retrieved values that should replace the EARLIEST_OFFSET sentinels
    +		final Map<TopicPartition, Long> mockRetrievedPositions = new HashMap<>();
    +		mockRetrievedPositions.put(newPartition.getKafkaPartitionHandle(), 30L);
    +
    +		final KafkaConsumer<byte[], byte[]> mockConsumer = createMockConsumer(
    +				mockConsumerAssignmentsAndPositions,
    +				mockRetrievedPositions,
    +				false,
    +				null,
    +				null);
    +
    +		// -------- setup new partitions to be polled from the unassigned partitions queue --------
    +
    +		final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue =
    +			new ClosableBlockingQueue<>();
    +
    +		unassignedPartitionsQueue.add(newPartition);
    +
    +		// -------- start test --------
    +
    +		final TestKafkaConsumerThread testThread =
    +			new TestKafkaConsumerThread(mockConsumer, unassignedPartitionsQueue, new Handover());
    +		testThread.start();
    +
    +		testThread.startPartitionReassignment();
    +		testThread.waitPartitionReassignmentComplete();
    +
    +		// the sentinel offset states should have been replaced with defined values according to the retrieved positions
    +		assertEquals(mockRetrievedPositions.get(newPartition.getKafkaPartitionHandle()) - 1, newPartition.getOffset());
    +
    +		// verify that the consumer called assign() with all new partitions, and that positions are correctly advanced
    +
    +		assertEquals(totalPartitions.size(), mockConsumerAssignmentsAndPositions.size());
    +
    +		// old partitions should be re-seeked to their previous positions
    +		for (KafkaTopicPartitionState<TopicPartition> partition : totalPartitions) {
    +			assertTrue(mockConsumerAssignmentsAndPositions.containsKey(partition.getKafkaPartitionHandle()));
    +
    +			// should be seeked to (offset in state + 1) because offsets in state represent the last processed record
    +			assertEquals(
    +				partition.getOffset() + 1,
    +				mockConsumerAssignmentsAndPositions.get(partition.getKafkaPartitionHandle()).longValue());
    +		}
    +
    +		assertEquals(0, unassignedPartitionsQueue.size());
    +	}
    +
    +	/**
    +	 * Tests reassignment works correctly in the case when:
    +	 *  - the consumer already have some assignments
    +	 *  - new unassigned partitions already have defined offsets
    +	 *  - the consumer was woken up prior to the reassignment
    +	 *
    +	 * In this case, reassignment should not have occurred at all, and the consumer retains the original assignment.
    +	 *
    +	 * Setting a timeout because the test will not finish if there is logic error with
    +	 * the reassignment flow.
    +	 */
    +	@SuppressWarnings("unchecked")
    +	@Test(timeout = 10000)
    +	public void testReassigningPartitionsWithDefinedOffsetsWhenEarlyWakeup() throws Exception {
    +		final String testTopic = "test-topic";
    +
    +		// -------- old partitions --------
    +
    +		KafkaTopicPartitionState<TopicPartition> oldPartition1 = new KafkaTopicPartitionState<>(
    +			new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0));
    +		oldPartition1.setOffset(23L);
    +
    +		KafkaTopicPartitionState<TopicPartition> oldPartition2 = new KafkaTopicPartitionState<>(
    +			new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1));
    +		oldPartition2.setOffset(32L);
    +
    +		List<KafkaTopicPartitionState<TopicPartition>> oldPartitions = new ArrayList<>(2);
    +		oldPartitions.add(oldPartition1);
    +		oldPartitions.add(oldPartition2);
    +
    +		// -------- new partitions with defined offsets --------
    +
    +		KafkaTopicPartitionState<TopicPartition> newPartition = new KafkaTopicPartitionState<>(
    +			new KafkaTopicPartition(testTopic, 2), new TopicPartition(testTopic, 2));
    +		newPartition.setOffset(29L);
    +
    +		// -------- setup mock KafkaConsumer --------
    +
    +		// initial assignments
    +		final Map<TopicPartition, Long> mockConsumerAssignmentsToPositions = new LinkedHashMap<>();
    +		for (KafkaTopicPartitionState<TopicPartition> oldPartition : oldPartitions) {
    +			mockConsumerAssignmentsToPositions.put(oldPartition.getKafkaPartitionHandle(), oldPartition.getOffset() + 1);
    +		}
    +
    +		final KafkaConsumer<byte[], byte[]> mockConsumer = createMockConsumer(
    +				mockConsumerAssignmentsToPositions,
    +				Collections.<TopicPartition, Long>emptyMap(),
    +				true,
    +				null,
    +				null);
    +
    +		// -------- setup new partitions to be polled from the unassigned partitions queue --------
    +
    +		final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue =
    +			new ClosableBlockingQueue<>();
    +
    +		unassignedPartitionsQueue.add(newPartition);
    +
    +		// -------- start test --------
    +
    +		final TestKafkaConsumerThread testThread =
    +			new TestKafkaConsumerThread(mockConsumer, unassignedPartitionsQueue, new Handover());
    +		testThread.start();
    +
    +		// pause just before the reassignment so we can inject the wakeup
    +		testThread.waitPartitionReassignmentInvoked();
    +
    +		testThread.setOffsetsToCommit(new HashMap<TopicPartition, OffsetAndMetadata>());
    +		verify(mockConsumer, times(1)).wakeup();
    +
    +		testThread.startPartitionReassignment();
    +		testThread.waitPartitionReassignmentComplete();
    +
    +		// the consumer's assignment should have remained untouched
    +
    +		assertEquals(oldPartitions.size(), mockConsumerAssignmentsToPositions.size());
    +
    +		for (KafkaTopicPartitionState<TopicPartition> oldPartition : oldPartitions) {
    +			assertTrue(mockConsumerAssignmentsToPositions.containsKey(oldPartition.getKafkaPartitionHandle()));
    +			assertEquals(
    +					oldPartition.getOffset() + 1,
    +					mockConsumerAssignmentsToPositions.get(oldPartition.getKafkaPartitionHandle()).longValue());
    +		}
    +
    +		// the new partitions should have been re-added to the unassigned partitions queue
    +		assertEquals(1, unassignedPartitionsQueue.size());
    +	}
    +
    +	/**
    +	 * Tests reassignment works correctly in the case when:
    +	 *  - the consumer has no initial assignments
    +	 *  - new unassigned partitions have undefined offsets
    +	 *  - the consumer was woken up prior to the reassignment
    +	 *
    +	 * In this case, reassignment should not have occurred at all, and the consumer retains the original assignment.
    +	 *
    +	 * Setting a timeout because the test will not finish if there is logic error with
    +	 * the reassignment flow.
    +	 */
    +	@SuppressWarnings("unchecked")
    +	@Test(timeout = 10000)
    +	public void testReassignPartitionsDefinedOffsetsWithoutInitialAssignmentsWhenEarlyWakeup() throws Exception {
    +		final String testTopic = "test-topic";
    +
    +		// -------- new partitions with defined offsets --------
    +
    +		KafkaTopicPartitionState<TopicPartition> newPartition1 = new KafkaTopicPartitionState<>(
    +			new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0));
    +		newPartition1.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
    +
    +		KafkaTopicPartitionState<TopicPartition> newPartition2 = new KafkaTopicPartitionState<>(
    +			new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1));
    +		newPartition2.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
    +
    +		List<KafkaTopicPartitionState<TopicPartition>> newPartitions = new ArrayList<>(2);
    +		newPartitions.add(newPartition1);
    +		newPartitions.add(newPartition2);
    +
    +		// -------- setup mock KafkaConsumer --------
    +
    +		// no initial assignments
    +		final Map<TopicPartition, Long> mockConsumerAssignmentsAndPositions = new LinkedHashMap<>();
    +
    +		// mock retrieved values that should replace the EARLIEST_OFFSET sentinels
    +		final Map<TopicPartition, Long> mockRetrievedPositions = new HashMap<>();
    +		mockRetrievedPositions.put(newPartition1.getKafkaPartitionHandle(), 23L);
    +		mockRetrievedPositions.put(newPartition2.getKafkaPartitionHandle(), 32L);
    +
    +		final KafkaConsumer<byte[], byte[]> mockConsumer = createMockConsumer(
    +				mockConsumerAssignmentsAndPositions,
    +				mockRetrievedPositions,
    +				true,
    +				null,
    +				null);
    +
    +		// -------- setup new partitions to be polled from the unassigned partitions queue --------
    +
    +		final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue =
    +			new ClosableBlockingQueue<>();
    +
    +		for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) {
    +			unassignedPartitionsQueue.add(newPartition);
    +		}
    +
    +		// -------- start test --------
    +
    +		final TestKafkaConsumerThread testThread =
    +			new TestKafkaConsumerThread(mockConsumer, unassignedPartitionsQueue, new Handover());
    +		testThread.start();
    +
    +		// pause just before the reassignment so we can inject the wakeup
    +		testThread.waitPartitionReassignmentInvoked();
    +
    +		testThread.setOffsetsToCommit(new HashMap<TopicPartition, OffsetAndMetadata>());
    +
    +		// make sure the consumer was actually woken up
    +		verify(mockConsumer, times(1)).wakeup();
    +
    +		testThread.startPartitionReassignment();
    +		testThread.waitPartitionReassignmentComplete();
    +
    +		// the consumer's assignment should have remained untouched (in this case, empty)
    +		assertEquals(0, mockConsumerAssignmentsAndPositions.size());
    +
    +		// the new partitions should have been re-added to the unassigned partitions queue
    +		assertEquals(2, unassignedPartitionsQueue.size());
    +	}
    +
    +	/**
    +	 * Tests reassignment works correctly in the case when:
    +	 *  - the consumer has no initial assignments
    +	 *  - new unassigned partitions have undefined offsets
    +	 *  - the consumer was woken up during the reassignment
    +	 *
    +	 * In this case, reassignment should have completed, and the consumer is restored the wakeup call after the reassignment.
    +	 *
    +	 * Setting a timeout because the test will not finish if there is logic error with
    +	 * the reassignment flow.
    +	 */
    +	@SuppressWarnings("unchecked")
    +	@Test(timeout = 10000)
    +	public void testReassignPartitionsDefinedOffsetsWithoutInitialAssignmentsWhenWakeupMidway() throws Exception {
    +		final String testTopic = "test-topic";
    +
    +		// -------- new partitions with defined offsets --------
    +
    +		KafkaTopicPartitionState<TopicPartition> newPartition1 = new KafkaTopicPartitionState<>(
    +			new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0));
    +		newPartition1.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
    +
    +		KafkaTopicPartitionState<TopicPartition> newPartition2 = new KafkaTopicPartitionState<>(
    +			new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1));
    +		newPartition2.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
    +
    +		List<KafkaTopicPartitionState<TopicPartition>> newPartitions = new ArrayList<>(2);
    +		newPartitions.add(newPartition1);
    +		newPartitions.add(newPartition2);
    +
    +		// -------- setup mock KafkaConsumer --------
    +
    +		// no initial assignments
    +		final Map<TopicPartition, Long> mockConsumerAssignmentsAndPositions = new LinkedHashMap<>();
    +
    +		// mock retrieved values that should replace the EARLIEST_OFFSET sentinels
    +		final Map<TopicPartition, Long> mockRetrievedPositions = new HashMap<>();
    +		mockRetrievedPositions.put(newPartition1.getKafkaPartitionHandle(), 23L);
    +		mockRetrievedPositions.put(newPartition2.getKafkaPartitionHandle(), 32L);
    +
    +		// these latches are used to pause midway the reassignment process
    +		final OneShotLatch midAssignmentLatch = new OneShotLatch();
    +		final OneShotLatch continueAssigmentLatch = new OneShotLatch();
    +
    +		final KafkaConsumer<byte[], byte[]> mockConsumer = createMockConsumer(
    +				mockConsumerAssignmentsAndPositions,
    +				mockRetrievedPositions,
    +				false,
    +				midAssignmentLatch,
    +				continueAssigmentLatch);
    +
    +		// -------- setup new partitions to be polled from the unassigned partitions queue --------
    +
    +		final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue =
    +			new ClosableBlockingQueue<>();
    +
    +		for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) {
    +			unassignedPartitionsQueue.add(newPartition);
    +		}
    +
    +		// -------- start test --------
    +
    +		final TestKafkaConsumerThread testThread =
    +			new TestKafkaConsumerThread(mockConsumer, unassignedPartitionsQueue, new Handover());
    +		testThread.start();
    +
    +		testThread.startPartitionReassignment();
    +
    +		// wait until the reassignment has started
    +		midAssignmentLatch.await();
    +
    +		testThread.setOffsetsToCommit(new HashMap<TopicPartition, OffsetAndMetadata>());
    +
    +		// the wakeup the setOffsetsToCommit() call should have been buffered, and not called on the consumer
    +		verify(mockConsumer, never()).wakeup();
    +
    +		continueAssigmentLatch.trigger();
    +
    +		testThread.waitPartitionReassignmentComplete();
    +
    +		// verify that the consumer called assign() with all new partitions, and that positions are correctly advanced
    +
    +		assertEquals(newPartitions.size(), mockConsumerAssignmentsAndPositions.size());
    +
    +		for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) {
    +			assertTrue(mockConsumerAssignmentsAndPositions.containsKey(newPartition.getKafkaPartitionHandle()));
    +
    +			// should be seeked to (offset in state + 1) because offsets in state represent the last processed record
    +			assertEquals(
    +				newPartition.getOffset() + 1,
    +				mockConsumerAssignmentsAndPositions.get(newPartition.getKafkaPartitionHandle()).longValue());
    +		}
    +
    +		// after the reassignment, the consumer should be restored the wakeup call
    +		verify(mockConsumer, times(1)).wakeup();
    +
    +		assertEquals(0, unassignedPartitionsQueue.size());
    +	}
    +
    +	/**
    +	 * A testable {@link KafkaConsumerThread} that injects multiple latches exactly before and after
    +	 * partition reassignment, so that tests are eligible to setup various conditions before the reassignment happens
    +	 * and inspect reqssignment results after it is completed.
    --- End diff --
    
    Typo "reqssignment" 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3746: [FLINK-4022] [kafka] Partition and topic discovery for Fl...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3746
  
    (please ignore the batch of commits. I'm just using this branch for an overall Travis run before committing the batch; personal accounts get time-outs too constantly)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3746: [FLINK-4022] [kafka] Partition and topic discovery...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3746#discussion_r124785388
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -189,6 +192,8 @@
     	 * @param topics fixed list of topics to subscribe to (null, if using topic pattern)
     	 * @param topicPattern the topic pattern to subscribe to (null, if using fixed topics)
     	 * @param deserializer The deserializer to turn raw byte messages into Java/Scala objects.
    +	 * @param discoveryIntervalMillis the topic / partition discovery interval, in
    +	 *                                milliseconds (0 if discovery is disabled).
    --- End diff --
    
    Here it says `0` for disabled while in other places it's `Long.MIN_VALUE`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3746: [FLINK-4022] [kafka] Partition and topic discovery...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3746#discussion_r116000971
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -503,23 +644,30 @@ public void close() throws Exception {
     	public void initializeState(FunctionInitializationContext context) throws Exception {
     
     		OperatorStateStore stateStore = context.getOperatorStateStore();
    -		offsetsStateForCheckpoint = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
     
    -		if (context.isRestored()) {
    -			if (restoredState == null) {
    -				restoredState = new HashMap<>();
    -				for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : offsetsStateForCheckpoint.get()) {
    -					restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
    -				}
    +		ListState<Tuple2<KafkaTopicPartition, Long>> oldRoundRobinListState =
    +			stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
     
    -				LOG.info("Setting restore state in the FlinkKafkaConsumer.");
    -				if (LOG.isDebugEnabled()) {
    -					LOG.debug("Using the following offsets: {}", restoredState);
    -				}
    +		this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(
    +				OFFSETS_STATE_NAME,
    +				TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {})));
    +
    +		if (context.isRestored() && !restoredFromOldState) {
    +			restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());
    +
    +			// migrate from 1.2 state, if there is any
    +			for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : oldRoundRobinListState.get()) {
    +				restoredFromOldState = true;
    --- End diff --
    
    Could it be that we restore from an old 1.2 snapshot and don't get anything here because we simply weren't assigned any state. (For example because the parallelism is higher than before.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3746: [FLINK-4022] [kafka] Partition and topic discovery...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3746#discussion_r116352819
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -503,23 +644,30 @@ public void close() throws Exception {
     	public void initializeState(FunctionInitializationContext context) throws Exception {
     
     		OperatorStateStore stateStore = context.getOperatorStateStore();
    -		offsetsStateForCheckpoint = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
     
    -		if (context.isRestored()) {
    -			if (restoredState == null) {
    -				restoredState = new HashMap<>();
    -				for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : offsetsStateForCheckpoint.get()) {
    -					restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
    -				}
    +		ListState<Tuple2<KafkaTopicPartition, Long>> oldRoundRobinListState =
    +			stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
     
    -				LOG.info("Setting restore state in the FlinkKafkaConsumer.");
    -				if (LOG.isDebugEnabled()) {
    -					LOG.debug("Using the following offsets: {}", restoredState);
    -				}
    +		this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(
    +				OFFSETS_STATE_NAME,
    +				TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {})));
    +
    +		if (context.isRestored() && !restoredFromOldState) {
    +			restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());
    +
    +			// migrate from 1.2 state, if there is any
    +			for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : oldRoundRobinListState.get()) {
    +				restoredFromOldState = true;
    --- End diff --
    
    You're right, this is definitely problematic. I think we'll have to block this feature until we have the required state information accessible before we can merge this then.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3746: [FLINK-4022] [kafka] Partition and topic discovery for Fl...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3746
  
    @aljoscha thanks for the review! Yes, the Kafka 0.8 code hasn't been touched much, and the `KafkaPartitionDiscoverer08` is mainly just a copy of the previous partition fetching code with only minor refactoring.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3746: [FLINK-4022] [kafka] Partition and topic discovery...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3746#discussion_r114760511
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -424,65 +485,136 @@ public void run(SourceContext<T> sourceContext) throws Exception {
     			throw new Exception("The partitions were not set for the consumer");
     		}
     
    -		// we need only do work, if we actually have partitions assigned
    -		if (!subscribedPartitionsToStartOffsets.isEmpty()) {
    -
    -			// create the fetcher that will communicate with the Kafka brokers
    -			final AbstractFetcher<T, ?> fetcher = createFetcher(
    -					sourceContext,
    -					subscribedPartitionsToStartOffsets,
    -					periodicWatermarkAssigner,
    -					punctuatedWatermarkAssigner,
    -					(StreamingRuntimeContext) getRuntimeContext(),
    -					offsetCommitMode);
    -
    -			// publish the reference, for snapshot-, commit-, and cancel calls
    -			// IMPORTANT: We can only do that now, because only now will calls to
    -			//            the fetchers 'snapshotCurrentState()' method return at least
    -			//            the restored offsets
    -			this.kafkaFetcher = fetcher;
    -			if (!running) {
    -				return;
    -			}
    -			
    -			// (3) run the fetcher' main work method
    -			fetcher.runFetchLoop();
    +		this.runThread = Thread.currentThread();
    +
    +		// mark the subtask as temporarily idle if there are no initial seed partitions;
    +		// once this subtask discovers some partitions and starts collecting records, the subtask's
    +		// status will automatically be triggered back to be active.
    +		if (subscribedPartitionsToStartOffsets.isEmpty()) {
    +			sourceContext.markAsTemporarilyIdle();
     		}
    -		else {
    -			// this source never completes, so emit a Long.MAX_VALUE watermark
    -			// to not block watermark forwarding
    -			sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
     
    -			// wait until this is canceled
    -			final Object waitLock = new Object();
    +		// create the fetcher that will communicate with the Kafka brokers
    +		final AbstractFetcher<T, ?> fetcher = createFetcher(
    +				sourceContext,
    +				subscribedPartitionsToStartOffsets,
    +				periodicWatermarkAssigner,
    +				punctuatedWatermarkAssigner,
    +				(StreamingRuntimeContext) getRuntimeContext(),
    +				offsetCommitMode);
    +
    +		// publish the reference, for snapshot-, commit-, and cancel calls
    +		// IMPORTANT: We can only do that now, because only now will calls to
    +		//            the fetchers 'snapshotCurrentState()' method return at least
    +		//            the restored offsets
    +		this.kafkaFetcher = fetcher;
    +
    +		if (!running) {
    +			return;
    +		}
    +
    +		// depending on whether we were restored with the current state version (1.3),
    +		// remaining logic branches off into 2 paths:
    +		//  1) New state - main fetcher loop executed as separate thread, with this
    +		//                 thread running the partition discovery loop
    +		//  2) Old state - partition discovery is disabled, simply going into the main fetcher loop
    +
    +		if (!restoredFromOldState) {
    +			final AtomicReference<Exception> fetcherErrorRef = new AtomicReference<>();
    +			Thread fetcherThread = new Thread(new Runnable() {
    +				@Override
    +				public void run() {
    +					try {
    +						// run the fetcher' main work method
    +						kafkaFetcher.runFetchLoop();
    --- End diff --
    
    Before this, the Fetcher was run in the Task thread. I'm not sure that's strictly necessary anymore but in the past there were always problems if a Thread that is not the main Thread of a Task was emitting stuff.
    
    Is there a good reason for not starting the partition discoverer in a separate thread?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3746: [FLINK-4022] [kafka] Partition and topic discovery...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3746#discussion_r115996970
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java ---
    @@ -106,31 +139,69 @@ protected AbstractFetcher(
     			}
     		}
     
    -		// create our partition state according to the timestamp/watermark mode 
    -		this.subscribedPartitionStates = initializeSubscribedPartitionStates(
    -				assignedPartitionsWithInitialOffsets,
    +		this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();
    +
    +		// initialize subscribed partition states with seed partitions
    +		this.subscribedPartitionStates = createPartitionStateHolders(
    +				seedPartitionsWithInitialOffsets,
     				timestampWatermarkMode,
    -				watermarksPeriodic, watermarksPunctuated,
    +				watermarksPeriodic,
    +				watermarksPunctuated,
     				userCodeClassLoader);
     
    -		// check that all partition states have a defined offset
    +		// check that all seed partition states have a defined offset
     		for (KafkaTopicPartitionState partitionState : subscribedPartitionStates) {
     			if (!partitionState.isOffsetDefined()) {
    -				throw new IllegalArgumentException("The fetcher was assigned partitions with undefined initial offsets.");
    +				throw new IllegalArgumentException("The fetcher was assigned seed partitions with undefined initial offsets.");
     			}
     		}
    -		
    +
    +		// all seed partitions are not assigned yet, so should be added to the unassigned partitions queue
    +		for (KafkaTopicPartitionState<KPH> partition : subscribedPartitionStates) {
    +			unassignedPartitionsQueue.add(partition);
    +		}
    +
     		// if we have periodic watermarks, kick off the interval scheduler
     		if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
    -			KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] parts = 
    -					(KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) subscribedPartitionStates;
    -			
    -			PeriodicWatermarkEmitter periodicEmitter = 
    -					new PeriodicWatermarkEmitter(parts, sourceContext, processingTimeProvider, autoWatermarkInterval);
    +			@SuppressWarnings("unchecked")
    +			PeriodicWatermarkEmitter periodicEmitter = new PeriodicWatermarkEmitter(
    --- End diff --
    
    Could add the generic parameter `KPH` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3746: [FLINK-4022] [kafka] Partition and topic discovery for Fl...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3746
  
    Travis passed, merging ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3746: [FLINK-4022] [kafka] Partition and topic discovery...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3746#discussion_r114782717
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java ---
    @@ -219,6 +209,15 @@ public void run() {
     					}
     				}
     
    +				try {
    +					newPartitions = unassignedPartitionsQueue.pollBatch();
    +					if (newPartitions != null) {
    +						reassignPartitions(newPartitions);
    +					}
    +				} catch (AbortedReassignmentException e) {
    +					continue;
    --- End diff --
    
    Why is it necessary to continue here, instead of continuing with the rest of the loop?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3746: [FLINK-4022] [kafka] Partition and topic discovery for Fl...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3746
  
    Thanks for the review @aljoscha. Will address the comment and merge after Travis gives green :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3746: [FLINK-4022] [kafka] Partition and topic discovery for Fl...

Posted by haohui <gi...@git.apache.org>.
Github user haohui commented on the issue:

    https://github.com/apache/flink/pull/3746
  
    Any updates on this? Would love to try this out :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3746: [FLINK-4022] [kafka] Partition and topic discovery for Fl...

Posted by haohui <gi...@git.apache.org>.
Github user haohui commented on the issue:

    https://github.com/apache/flink/pull/3746
  
    @tzulitai we are interested in putting this feature in production soon. Do you think whether it is possible for us to get a workaround for 1.3? Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3746: [FLINK-4022] [kafka] Partition and topic discovery...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3746#discussion_r114821998
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java ---
    @@ -219,6 +209,15 @@ public void run() {
     					}
     				}
     
    +				try {
    +					newPartitions = unassignedPartitionsQueue.pollBatch();
    +					if (newPartitions != null) {
    +						reassignPartitions(newPartitions);
    +					}
    +				} catch (AbortedReassignmentException e) {
    +					continue;
    --- End diff --
    
    Thanks! that makes sense



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3746: [FLINK-4022] [kafka] Partition and topic discovery for Fl...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3746
  
    Hi @haohui, unfortunately we might not be able to merge this for 1.3. The reason is that the feature requires a migration from partitionable list state to union list state, which currently can not be properly done, as can be seen in Aljoscha's inline comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3746: [FLINK-4022] [kafka] Partition and topic discovery...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3746


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3746: [FLINK-4022] [kafka] Partition and topic discovery...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3746#discussion_r114788919
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java ---
    @@ -0,0 +1,842 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka.internal;
    +
    +import org.apache.flink.core.testutils.MultiShotLatch;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.metrics.MetricGroup;
    +import org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue;
    +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
    +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
    +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.kafka.common.errors.WakeupException;
    +import org.junit.Test;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +import org.slf4j.Logger;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.mockito.Matchers.any;
    +import static org.mockito.Matchers.anyListOf;
    +import static org.mockito.Matchers.anyLong;
    +import static org.mockito.Mockito.doAnswer;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.never;
    +import static org.mockito.Mockito.times;
    +import static org.mockito.Mockito.verify;
    +import static org.mockito.Mockito.when;
    +
    +/**
    + * Unit tests for the {@link KafkaConsumerThread}.
    + */
    +public class KafkaConsumerThreadTest {
    +
    +	/**
    +	 * Tests reassignment works correctly in the case when:
    +	 *  - the consumer initially had no assignments
    +	 *  - new unassigned partitions already have defined offsets
    +	 *
    +	 * Setting a timeout because the test will not finish if there is logic error with
    +	 * the reassignment flow.
    +	 */
    +	@SuppressWarnings("unchecked")
    +	@Test(timeout = 10000)
    +	public void testReassigningPartitionsWithDefinedOffsetsWhenNoInitialAssignment() throws Exception {
    +		final String testTopic = "test-topic";
    +
    +		// -------- new partitions with defined offsets --------
    +
    +		KafkaTopicPartitionState<TopicPartition> newPartition1 = new KafkaTopicPartitionState<>(
    +			new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0));
    +		newPartition1.setOffset(23L);
    +
    +		KafkaTopicPartitionState<TopicPartition> newPartition2 = new KafkaTopicPartitionState<>(
    +			new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1));
    +		newPartition2.setOffset(31L);
    +
    +		final List<KafkaTopicPartitionState<TopicPartition>> newPartitions = new ArrayList<>(2);
    +		newPartitions.add(newPartition1);
    +		newPartitions.add(newPartition2);
    +
    +		// -------- setup mock KafkaConsumer --------
    +
    +		// no initial assignment
    +		final Map<TopicPartition, Long> mockConsumerAssignmentsAndPositions = new LinkedHashMap<>();
    +
    +		final KafkaConsumer<byte[], byte[]> mockConsumer = createMockConsumer(
    +				mockConsumerAssignmentsAndPositions,
    +				Collections.<TopicPartition, Long>emptyMap(),
    +				false,
    +				null,
    +				null);
    +
    +		// -------- setup new partitions to be polled from the unassigned partitions queue --------
    +
    +		final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue =
    +			new ClosableBlockingQueue<>();
    +
    +		for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) {
    +			unassignedPartitionsQueue.add(newPartition);
    +		}
    +
    +		// -------- start test --------
    +
    +		final TestKafkaConsumerThread testThread =
    +				new TestKafkaConsumerThread(mockConsumer, unassignedPartitionsQueue, new Handover());
    +		testThread.start();
    +
    +		testThread.startPartitionReassignment();
    +		testThread.waitPartitionReassignmentComplete();
    +
    +		// verify that the consumer called assign() with all new partitions, and that positions are correctly advanced
    +
    +		assertEquals(newPartitions.size(), mockConsumerAssignmentsAndPositions.size());
    +
    +		for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) {
    +			assertTrue(mockConsumerAssignmentsAndPositions.containsKey(newPartition.getKafkaPartitionHandle()));
    +
    +			// should be seeked to (offset in state + 1) because offsets in state represent the last processed record
    +			assertEquals(
    +					newPartition.getOffset() + 1,
    +					mockConsumerAssignmentsAndPositions.get(newPartition.getKafkaPartitionHandle()).longValue());
    +		}
    +
    +		assertEquals(0, unassignedPartitionsQueue.size());
    +	}
    +
    +	/**
    +	 * Tests reassignment works correctly in the case when:
    +	 *  - the consumer initially had no assignments
    +	 *  - new unassigned partitions have undefined offsets (e.g. EARLIEST_OFFSET sentinel value)
    +	 *
    +	 * Setting a timeout because the test will not finish if there is logic error with
    +	 * the reassignment flow.
    +	 */
    +	@SuppressWarnings("unchecked")
    +	@Test(timeout = 10000)
    +	public void testReassigningPartitionsWithoutDefinedOffsetsWhenNoInitialAssignment() throws Exception {
    +		final String testTopic = "test-topic";
    +
    +		// -------- new partitions with undefined offsets --------
    +
    +		KafkaTopicPartitionState<TopicPartition> newPartition1 = new KafkaTopicPartitionState<>(
    +			new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0));
    +		newPartition1.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
    +
    +		KafkaTopicPartitionState<TopicPartition> newPartition2 = new KafkaTopicPartitionState<>(
    +			new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1));
    +		newPartition2.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
    +
    +		final List<KafkaTopicPartitionState<TopicPartition>> newPartitions = new ArrayList<>(2);
    +		newPartitions.add(newPartition1);
    +		newPartitions.add(newPartition2);
    +
    +		// -------- setup mock KafkaConsumer --------
    +
    +		// no initial assignment
    +		final Map<TopicPartition, Long> mockConsumerAssignmentsAndPositions = new LinkedHashMap<>();
    +
    +		// mock retrieved values that should replace the EARLIEST_OFFSET sentinels
    +		final Map<TopicPartition, Long> mockRetrievedPositions = new HashMap<>();
    +		mockRetrievedPositions.put(newPartition1.getKafkaPartitionHandle(), 23L);
    +		mockRetrievedPositions.put(newPartition2.getKafkaPartitionHandle(), 32L);
    +
    +		final KafkaConsumer<byte[], byte[]> mockConsumer = createMockConsumer(
    +				mockConsumerAssignmentsAndPositions,
    +				mockRetrievedPositions,
    +				false,
    +				null,
    +				null);
    +
    +		// -------- setup new partitions to be polled from the unassigned partitions queue --------
    +
    +		final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue =
    +			new ClosableBlockingQueue<>();
    +
    +		for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) {
    +			unassignedPartitionsQueue.add(newPartition);
    +		}
    +
    +		// -------- start test --------
    +
    +		final TestKafkaConsumerThread testThread =
    +			new TestKafkaConsumerThread(mockConsumer, unassignedPartitionsQueue, new Handover());
    +		testThread.start();
    +
    +		testThread.startPartitionReassignment();
    +		testThread.waitPartitionReassignmentComplete();
    +
    +		// the sentinel offset states should have been replaced with defined values according to the retrieved values
    +		assertEquals(mockRetrievedPositions.get(newPartition1.getKafkaPartitionHandle()) - 1, newPartition1.getOffset());
    +		assertEquals(mockRetrievedPositions.get(newPartition2.getKafkaPartitionHandle()) - 1, newPartition2.getOffset());
    +
    +		// verify that the consumer called assign() with all new partitions, and that positions are correctly advanced
    +
    +		assertEquals(newPartitions.size(), mockConsumerAssignmentsAndPositions.size());
    +
    +		for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) {
    +			assertTrue(mockConsumerAssignmentsAndPositions.containsKey(newPartition.getKafkaPartitionHandle()));
    +
    +			// should be seeked to (offset in state + 1) because offsets in state represent the last processed record
    +			assertEquals(
    +					newPartition.getOffset() + 1,
    +					mockConsumerAssignmentsAndPositions.get(newPartition.getKafkaPartitionHandle()).longValue());
    +		}
    +
    +		assertEquals(0, unassignedPartitionsQueue.size());
    +	}
    +
    +	/**
    +	 * Tests reassignment works correctly in the case when:
    +	 *  - the consumer already have some assignments
    +	 *  - new unassigned partitions already have defined offsets
    +	 *
    +	 * Setting a timeout because the test will not finish if there is logic error with
    +	 * the reassignment flow.
    +	 */
    +	@SuppressWarnings("unchecked")
    +	@Test(timeout = 10000)
    +	public void testReassigningPartitionsWithDefinedOffsets() throws Exception {
    +		final String testTopic = "test-topic";
    +
    +		// -------- old partitions --------
    +
    +		KafkaTopicPartitionState<TopicPartition> oldPartition1 = new KafkaTopicPartitionState<>(
    +			new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0));
    +		oldPartition1.setOffset(23L);
    +
    +		KafkaTopicPartitionState<TopicPartition> oldPartition2 = new KafkaTopicPartitionState<>(
    +			new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1));
    +		oldPartition2.setOffset(32L);
    +
    +		List<KafkaTopicPartitionState<TopicPartition>> oldPartitions = new ArrayList<>(2);
    +		oldPartitions.add(oldPartition1);
    +		oldPartitions.add(oldPartition2);
    +
    +		// -------- new partitions with defined offsets --------
    +
    +		KafkaTopicPartitionState<TopicPartition> newPartition = new KafkaTopicPartitionState<>(
    +			new KafkaTopicPartition(testTopic, 2), new TopicPartition(testTopic, 2));
    +		newPartition.setOffset(29L);
    +
    +		List<KafkaTopicPartitionState<TopicPartition>> totalPartitions = new ArrayList<>(3);
    +		totalPartitions.add(oldPartition1);
    +		totalPartitions.add(oldPartition2);
    +		totalPartitions.add(newPartition);
    +
    +		// -------- setup mock KafkaConsumer --------
    +
    +		// has initial assignments
    +		final Map<TopicPartition, Long> mockConsumerAssignmentsAndPositions = new HashMap<>();
    +		for (KafkaTopicPartitionState<TopicPartition> oldPartition : oldPartitions) {
    +			mockConsumerAssignmentsAndPositions.put(oldPartition.getKafkaPartitionHandle(), oldPartition.getOffset() + 1);
    +		}
    +
    +		final KafkaConsumer<byte[], byte[]> mockConsumer = createMockConsumer(
    +				mockConsumerAssignmentsAndPositions,
    +				Collections.<TopicPartition, Long>emptyMap(),
    +				false,
    +				null,
    +				null);
    +
    +		// -------- setup new partitions to be polled from the unassigned partitions queue --------
    +
    +		final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue =
    +			new ClosableBlockingQueue<>();
    +
    +		unassignedPartitionsQueue.add(newPartition);
    +
    +		// -------- start test --------
    +
    +		final TestKafkaConsumerThread testThread =
    +			new TestKafkaConsumerThread(mockConsumer, unassignedPartitionsQueue, new Handover());
    +		testThread.start();
    +
    +		testThread.startPartitionReassignment();
    +		testThread.waitPartitionReassignmentComplete();
    +
    +		// verify that the consumer called assign() with all new partitions, and that positions are correctly advanced
    +
    +		assertEquals(totalPartitions.size(), mockConsumerAssignmentsAndPositions.size());
    +
    +		// old partitions should be re-seeked to their previous positions
    +		for (KafkaTopicPartitionState<TopicPartition> partition : totalPartitions) {
    +			assertTrue(mockConsumerAssignmentsAndPositions.containsKey(partition.getKafkaPartitionHandle()));
    +
    +			// should be seeked to (offset in state + 1) because offsets in state represent the last processed record
    +			assertEquals(
    +					partition.getOffset() + 1,
    +					mockConsumerAssignmentsAndPositions.get(partition.getKafkaPartitionHandle()).longValue());
    +		}
    +
    +		assertEquals(0, unassignedPartitionsQueue.size());
    +	}
    +
    +	/**
    +	 * Tests reassignment works correctly in the case when:
    +	 *  - the consumer already have some assignments
    +	 *  - new unassigned partitions have undefined offsets (e.g. EARLIEST_OFFSET sentinel value)
    +	 *
    +	 * Setting a timeout because the test will not finish if there is logic error with
    +	 * the reassignment flow.
    +	 */
    +	@SuppressWarnings("unchecked")
    +	@Test(timeout = 10000)
    +	public void testReassigningPartitionsWithoutDefinedOffsets() throws Exception {
    +		final String testTopic = "test-topic";
    +
    +		// -------- old partitions --------
    +
    +		KafkaTopicPartitionState<TopicPartition> oldPartition1 = new KafkaTopicPartitionState<>(
    +			new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0));
    +		oldPartition1.setOffset(23L);
    +
    +		KafkaTopicPartitionState<TopicPartition> oldPartition2 = new KafkaTopicPartitionState<>(
    +			new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1));
    +		oldPartition2.setOffset(32L);
    +
    +		List<KafkaTopicPartitionState<TopicPartition>> oldPartitions = new ArrayList<>(2);
    +		oldPartitions.add(oldPartition1);
    +		oldPartitions.add(oldPartition2);
    +
    +		// -------- new partitions with undefined offsets --------
    +
    +		KafkaTopicPartitionState<TopicPartition> newPartition = new KafkaTopicPartitionState<>(
    +			new KafkaTopicPartition(testTopic, 2), new TopicPartition(testTopic, 2));
    +		newPartition.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
    +
    +		List<KafkaTopicPartitionState<TopicPartition>> totalPartitions = new ArrayList<>(3);
    +		totalPartitions.add(oldPartition1);
    +		totalPartitions.add(oldPartition2);
    +		totalPartitions.add(newPartition);
    +
    +		// -------- setup mock KafkaConsumer --------
    +
    +		// has initial assignments
    +		final Map<TopicPartition, Long> mockConsumerAssignmentsAndPositions = new HashMap<>();
    +		for (KafkaTopicPartitionState<TopicPartition> oldPartition : oldPartitions) {
    +			mockConsumerAssignmentsAndPositions.put(oldPartition.getKafkaPartitionHandle(), oldPartition.getOffset() + 1);
    +		}
    +
    +		// mock retrieved values that should replace the EARLIEST_OFFSET sentinels
    +		final Map<TopicPartition, Long> mockRetrievedPositions = new HashMap<>();
    +		mockRetrievedPositions.put(newPartition.getKafkaPartitionHandle(), 30L);
    +
    +		final KafkaConsumer<byte[], byte[]> mockConsumer = createMockConsumer(
    +				mockConsumerAssignmentsAndPositions,
    +				mockRetrievedPositions,
    +				false,
    +				null,
    +				null);
    +
    +		// -------- setup new partitions to be polled from the unassigned partitions queue --------
    +
    +		final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue =
    +			new ClosableBlockingQueue<>();
    +
    +		unassignedPartitionsQueue.add(newPartition);
    +
    +		// -------- start test --------
    +
    +		final TestKafkaConsumerThread testThread =
    +			new TestKafkaConsumerThread(mockConsumer, unassignedPartitionsQueue, new Handover());
    +		testThread.start();
    +
    +		testThread.startPartitionReassignment();
    +		testThread.waitPartitionReassignmentComplete();
    +
    +		// the sentinel offset states should have been replaced with defined values according to the retrieved positions
    +		assertEquals(mockRetrievedPositions.get(newPartition.getKafkaPartitionHandle()) - 1, newPartition.getOffset());
    +
    +		// verify that the consumer called assign() with all new partitions, and that positions are correctly advanced
    +
    +		assertEquals(totalPartitions.size(), mockConsumerAssignmentsAndPositions.size());
    +
    +		// old partitions should be re-seeked to their previous positions
    +		for (KafkaTopicPartitionState<TopicPartition> partition : totalPartitions) {
    +			assertTrue(mockConsumerAssignmentsAndPositions.containsKey(partition.getKafkaPartitionHandle()));
    +
    +			// should be seeked to (offset in state + 1) because offsets in state represent the last processed record
    +			assertEquals(
    +				partition.getOffset() + 1,
    +				mockConsumerAssignmentsAndPositions.get(partition.getKafkaPartitionHandle()).longValue());
    +		}
    +
    +		assertEquals(0, unassignedPartitionsQueue.size());
    +	}
    +
    +	/**
    +	 * Tests reassignment works correctly in the case when:
    +	 *  - the consumer already have some assignments
    +	 *  - new unassigned partitions already have defined offsets
    +	 *  - the consumer was woken up prior to the reassignment
    +	 *
    +	 * In this case, reassignment should not have occurred at all, and the consumer retains the original assignment.
    +	 *
    +	 * Setting a timeout because the test will not finish if there is logic error with
    +	 * the reassignment flow.
    +	 */
    +	@SuppressWarnings("unchecked")
    +	@Test(timeout = 10000)
    +	public void testReassigningPartitionsWithDefinedOffsetsWhenEarlyWakeup() throws Exception {
    +		final String testTopic = "test-topic";
    +
    +		// -------- old partitions --------
    +
    +		KafkaTopicPartitionState<TopicPartition> oldPartition1 = new KafkaTopicPartitionState<>(
    +			new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0));
    +		oldPartition1.setOffset(23L);
    +
    +		KafkaTopicPartitionState<TopicPartition> oldPartition2 = new KafkaTopicPartitionState<>(
    +			new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1));
    +		oldPartition2.setOffset(32L);
    +
    +		List<KafkaTopicPartitionState<TopicPartition>> oldPartitions = new ArrayList<>(2);
    +		oldPartitions.add(oldPartition1);
    +		oldPartitions.add(oldPartition2);
    +
    +		// -------- new partitions with defined offsets --------
    +
    +		KafkaTopicPartitionState<TopicPartition> newPartition = new KafkaTopicPartitionState<>(
    +			new KafkaTopicPartition(testTopic, 2), new TopicPartition(testTopic, 2));
    +		newPartition.setOffset(29L);
    +
    +		// -------- setup mock KafkaConsumer --------
    +
    +		// initial assignments
    +		final Map<TopicPartition, Long> mockConsumerAssignmentsToPositions = new LinkedHashMap<>();
    +		for (KafkaTopicPartitionState<TopicPartition> oldPartition : oldPartitions) {
    +			mockConsumerAssignmentsToPositions.put(oldPartition.getKafkaPartitionHandle(), oldPartition.getOffset() + 1);
    +		}
    +
    +		final KafkaConsumer<byte[], byte[]> mockConsumer = createMockConsumer(
    +				mockConsumerAssignmentsToPositions,
    +				Collections.<TopicPartition, Long>emptyMap(),
    +				true,
    +				null,
    +				null);
    +
    +		// -------- setup new partitions to be polled from the unassigned partitions queue --------
    +
    +		final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue =
    +			new ClosableBlockingQueue<>();
    +
    +		unassignedPartitionsQueue.add(newPartition);
    +
    +		// -------- start test --------
    +
    +		final TestKafkaConsumerThread testThread =
    +			new TestKafkaConsumerThread(mockConsumer, unassignedPartitionsQueue, new Handover());
    +		testThread.start();
    +
    +		// pause just before the reassignment so we can inject the wakeup
    +		testThread.waitPartitionReassignmentInvoked();
    +
    +		testThread.setOffsetsToCommit(new HashMap<TopicPartition, OffsetAndMetadata>());
    +		verify(mockConsumer, times(1)).wakeup();
    +
    +		testThread.startPartitionReassignment();
    +		testThread.waitPartitionReassignmentComplete();
    +
    +		// the consumer's assignment should have remained untouched
    +
    +		assertEquals(oldPartitions.size(), mockConsumerAssignmentsToPositions.size());
    +
    +		for (KafkaTopicPartitionState<TopicPartition> oldPartition : oldPartitions) {
    +			assertTrue(mockConsumerAssignmentsToPositions.containsKey(oldPartition.getKafkaPartitionHandle()));
    +			assertEquals(
    +					oldPartition.getOffset() + 1,
    +					mockConsumerAssignmentsToPositions.get(oldPartition.getKafkaPartitionHandle()).longValue());
    +		}
    +
    +		// the new partitions should have been re-added to the unassigned partitions queue
    +		assertEquals(1, unassignedPartitionsQueue.size());
    +	}
    +
    +	/**
    +	 * Tests reassignment works correctly in the case when:
    +	 *  - the consumer has no initial assignments
    +	 *  - new unassigned partitions have undefined offsets
    +	 *  - the consumer was woken up prior to the reassignment
    +	 *
    +	 * In this case, reassignment should not have occurred at all, and the consumer retains the original assignment.
    +	 *
    +	 * Setting a timeout because the test will not finish if there is logic error with
    +	 * the reassignment flow.
    +	 */
    +	@SuppressWarnings("unchecked")
    +	@Test(timeout = 10000)
    +	public void testReassignPartitionsDefinedOffsetsWithoutInitialAssignmentsWhenEarlyWakeup() throws Exception {
    +		final String testTopic = "test-topic";
    +
    +		// -------- new partitions with defined offsets --------
    +
    +		KafkaTopicPartitionState<TopicPartition> newPartition1 = new KafkaTopicPartitionState<>(
    +			new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0));
    +		newPartition1.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
    +
    +		KafkaTopicPartitionState<TopicPartition> newPartition2 = new KafkaTopicPartitionState<>(
    +			new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1));
    +		newPartition2.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
    +
    +		List<KafkaTopicPartitionState<TopicPartition>> newPartitions = new ArrayList<>(2);
    +		newPartitions.add(newPartition1);
    +		newPartitions.add(newPartition2);
    +
    +		// -------- setup mock KafkaConsumer --------
    +
    +		// no initial assignments
    +		final Map<TopicPartition, Long> mockConsumerAssignmentsAndPositions = new LinkedHashMap<>();
    +
    +		// mock retrieved values that should replace the EARLIEST_OFFSET sentinels
    +		final Map<TopicPartition, Long> mockRetrievedPositions = new HashMap<>();
    +		mockRetrievedPositions.put(newPartition1.getKafkaPartitionHandle(), 23L);
    +		mockRetrievedPositions.put(newPartition2.getKafkaPartitionHandle(), 32L);
    +
    +		final KafkaConsumer<byte[], byte[]> mockConsumer = createMockConsumer(
    +				mockConsumerAssignmentsAndPositions,
    +				mockRetrievedPositions,
    +				true,
    +				null,
    +				null);
    +
    +		// -------- setup new partitions to be polled from the unassigned partitions queue --------
    +
    +		final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue =
    +			new ClosableBlockingQueue<>();
    +
    +		for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) {
    +			unassignedPartitionsQueue.add(newPartition);
    +		}
    +
    +		// -------- start test --------
    +
    +		final TestKafkaConsumerThread testThread =
    +			new TestKafkaConsumerThread(mockConsumer, unassignedPartitionsQueue, new Handover());
    +		testThread.start();
    +
    +		// pause just before the reassignment so we can inject the wakeup
    +		testThread.waitPartitionReassignmentInvoked();
    +
    +		testThread.setOffsetsToCommit(new HashMap<TopicPartition, OffsetAndMetadata>());
    +
    +		// make sure the consumer was actually woken up
    +		verify(mockConsumer, times(1)).wakeup();
    +
    +		testThread.startPartitionReassignment();
    +		testThread.waitPartitionReassignmentComplete();
    +
    +		// the consumer's assignment should have remained untouched (in this case, empty)
    +		assertEquals(0, mockConsumerAssignmentsAndPositions.size());
    +
    +		// the new partitions should have been re-added to the unassigned partitions queue
    +		assertEquals(2, unassignedPartitionsQueue.size());
    +	}
    +
    +	/**
    +	 * Tests reassignment works correctly in the case when:
    +	 *  - the consumer has no initial assignments
    +	 *  - new unassigned partitions have undefined offsets
    +	 *  - the consumer was woken up during the reassignment
    +	 *
    +	 * In this case, reassignment should have completed, and the consumer is restored the wakeup call after the reassignment.
    +	 *
    +	 * Setting a timeout because the test will not finish if there is logic error with
    +	 * the reassignment flow.
    +	 */
    +	@SuppressWarnings("unchecked")
    +	@Test(timeout = 10000)
    +	public void testReassignPartitionsDefinedOffsetsWithoutInitialAssignmentsWhenWakeupMidway() throws Exception {
    +		final String testTopic = "test-topic";
    +
    +		// -------- new partitions with defined offsets --------
    +
    +		KafkaTopicPartitionState<TopicPartition> newPartition1 = new KafkaTopicPartitionState<>(
    +			new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0));
    +		newPartition1.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
    +
    +		KafkaTopicPartitionState<TopicPartition> newPartition2 = new KafkaTopicPartitionState<>(
    +			new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1));
    +		newPartition2.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
    +
    +		List<KafkaTopicPartitionState<TopicPartition>> newPartitions = new ArrayList<>(2);
    +		newPartitions.add(newPartition1);
    +		newPartitions.add(newPartition2);
    +
    +		// -------- setup mock KafkaConsumer --------
    +
    +		// no initial assignments
    +		final Map<TopicPartition, Long> mockConsumerAssignmentsAndPositions = new LinkedHashMap<>();
    +
    +		// mock retrieved values that should replace the EARLIEST_OFFSET sentinels
    +		final Map<TopicPartition, Long> mockRetrievedPositions = new HashMap<>();
    +		mockRetrievedPositions.put(newPartition1.getKafkaPartitionHandle(), 23L);
    +		mockRetrievedPositions.put(newPartition2.getKafkaPartitionHandle(), 32L);
    +
    +		// these latches are used to pause midway the reassignment process
    +		final OneShotLatch midAssignmentLatch = new OneShotLatch();
    +		final OneShotLatch continueAssigmentLatch = new OneShotLatch();
    +
    +		final KafkaConsumer<byte[], byte[]> mockConsumer = createMockConsumer(
    +				mockConsumerAssignmentsAndPositions,
    +				mockRetrievedPositions,
    +				false,
    +				midAssignmentLatch,
    +				continueAssigmentLatch);
    +
    +		// -------- setup new partitions to be polled from the unassigned partitions queue --------
    +
    +		final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue =
    +			new ClosableBlockingQueue<>();
    +
    +		for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) {
    +			unassignedPartitionsQueue.add(newPartition);
    +		}
    +
    +		// -------- start test --------
    +
    +		final TestKafkaConsumerThread testThread =
    +			new TestKafkaConsumerThread(mockConsumer, unassignedPartitionsQueue, new Handover());
    +		testThread.start();
    +
    +		testThread.startPartitionReassignment();
    +
    +		// wait until the reassignment has started
    +		midAssignmentLatch.await();
    +
    +		testThread.setOffsetsToCommit(new HashMap<TopicPartition, OffsetAndMetadata>());
    +
    +		// the wakeup the setOffsetsToCommit() call should have been buffered, and not called on the consumer
    --- End diff --
    
    should be "the wake in the setOffset..."?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3746: [FLINK-4022] [kafka] Partition and topic discovery for Fl...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3746
  
    @haohui FYI - tagging you as we discussed about this feature offline


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3746: [FLINK-4022] [kafka] Partition and topic discovery for Fl...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3746
  
    @haohui you should be able to the PR now, would be great to hear feedback on this.
    As of now I'm not quite sure whether this will make it into 1.3 though; I think we'll have to see about the status of the 1.3 release in a while.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3746: [FLINK-4022] [kafka] Partition and topic discovery for Fl...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3746
  
    Hi @haohui, I revisited this, but I think there is no easy workaround for this, unfortunately.
    This can only be properly merged in 1.4 after some additional requirements are added to Flink.
    
    However, if you know that during Flink 1.2 your FlinkKafkaConsumer was not rescaled before, then in that case specifically you should be able to use this patch. It would however require a custom build of the connector on your side.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3746: [FLINK-4022] [kafka] Partition and topic discovery...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3746#discussion_r114793528
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -424,65 +485,136 @@ public void run(SourceContext<T> sourceContext) throws Exception {
     			throw new Exception("The partitions were not set for the consumer");
     		}
     
    -		// we need only do work, if we actually have partitions assigned
    -		if (!subscribedPartitionsToStartOffsets.isEmpty()) {
    -
    -			// create the fetcher that will communicate with the Kafka brokers
    -			final AbstractFetcher<T, ?> fetcher = createFetcher(
    -					sourceContext,
    -					subscribedPartitionsToStartOffsets,
    -					periodicWatermarkAssigner,
    -					punctuatedWatermarkAssigner,
    -					(StreamingRuntimeContext) getRuntimeContext(),
    -					offsetCommitMode);
    -
    -			// publish the reference, for snapshot-, commit-, and cancel calls
    -			// IMPORTANT: We can only do that now, because only now will calls to
    -			//            the fetchers 'snapshotCurrentState()' method return at least
    -			//            the restored offsets
    -			this.kafkaFetcher = fetcher;
    -			if (!running) {
    -				return;
    -			}
    -			
    -			// (3) run the fetcher' main work method
    -			fetcher.runFetchLoop();
    +		this.runThread = Thread.currentThread();
    +
    +		// mark the subtask as temporarily idle if there are no initial seed partitions;
    +		// once this subtask discovers some partitions and starts collecting records, the subtask's
    +		// status will automatically be triggered back to be active.
    +		if (subscribedPartitionsToStartOffsets.isEmpty()) {
    +			sourceContext.markAsTemporarilyIdle();
     		}
    -		else {
    -			// this source never completes, so emit a Long.MAX_VALUE watermark
    -			// to not block watermark forwarding
    -			sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
     
    -			// wait until this is canceled
    -			final Object waitLock = new Object();
    +		// create the fetcher that will communicate with the Kafka brokers
    +		final AbstractFetcher<T, ?> fetcher = createFetcher(
    +				sourceContext,
    +				subscribedPartitionsToStartOffsets,
    +				periodicWatermarkAssigner,
    +				punctuatedWatermarkAssigner,
    +				(StreamingRuntimeContext) getRuntimeContext(),
    +				offsetCommitMode);
    +
    +		// publish the reference, for snapshot-, commit-, and cancel calls
    +		// IMPORTANT: We can only do that now, because only now will calls to
    +		//            the fetchers 'snapshotCurrentState()' method return at least
    +		//            the restored offsets
    +		this.kafkaFetcher = fetcher;
    +
    +		if (!running) {
    +			return;
    +		}
    +
    +		// depending on whether we were restored with the current state version (1.3),
    +		// remaining logic branches off into 2 paths:
    +		//  1) New state - main fetcher loop executed as separate thread, with this
    +		//                 thread running the partition discovery loop
    +		//  2) Old state - partition discovery is disabled, simply going into the main fetcher loop
    +
    +		if (!restoredFromOldState) {
    +			final AtomicReference<Exception> fetcherErrorRef = new AtomicReference<>();
    +			Thread fetcherThread = new Thread(new Runnable() {
    +				@Override
    +				public void run() {
    +					try {
    +						// run the fetcher' main work method
    +						kafkaFetcher.runFetchLoop();
    --- End diff --
    
    Hmm, there actually isn't any good reason that this is required, as I can think of.
    
    one point regarding non-main thread emitting stuff: the Kafka 0.8 fetcher actually had always been emitting elements from different threads. So I didn't really assume which thread (main or separate) runs the fetcher loop and which one runs the discovery loop.
    
    but I think it's also ok to swap this here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3746: [FLINK-4022] [kafka] Partition and topic discovery...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3746#discussion_r115997026
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java ---
    @@ -106,31 +139,69 @@ protected AbstractFetcher(
     			}
     		}
     
    -		// create our partition state according to the timestamp/watermark mode 
    -		this.subscribedPartitionStates = initializeSubscribedPartitionStates(
    -				assignedPartitionsWithInitialOffsets,
    +		this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();
    +
    +		// initialize subscribed partition states with seed partitions
    +		this.subscribedPartitionStates = createPartitionStateHolders(
    +				seedPartitionsWithInitialOffsets,
     				timestampWatermarkMode,
    -				watermarksPeriodic, watermarksPunctuated,
    +				watermarksPeriodic,
    +				watermarksPunctuated,
     				userCodeClassLoader);
     
    -		// check that all partition states have a defined offset
    +		// check that all seed partition states have a defined offset
     		for (KafkaTopicPartitionState partitionState : subscribedPartitionStates) {
    --- End diff --
    
    Could add generic parameter, but is already existing code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3746: [FLINK-4022] [kafka] Partition and topic discovery for Fl...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3746
  
    Hi @aljoscha, I've rebased this PR, fixed the previous blocking issue (discovery enabling & restoring from round-robin list state), and added documentation for the limitation. Could you have another look? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3746: [FLINK-4022] [kafka] Partition and topic discovery...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3746#discussion_r114757153
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java ---
    @@ -0,0 +1,254 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka.internals;
    +
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * Base class for all partition discoverers.
    + *
    + * <p>This partition discoverer base class implements the logic around bookkeeping
    + * discovered partitions, and using the information to determine whether or not there
    + * are new partitions that the consumer subtask should subscribe to.
    + *
    + * <p>Subclass implementations should simply implement the logic of using the version-specific
    + * Kafka clients to fetch topic and partition metadata.
    + *
    + * <p>Since Kafka clients are generally not thread-safe, partition discoverers should
    + * not be concurrently accessed. The only exception for this would be the {@link #wakeup()}
    + * call, which allows the discoverer to be interrupted during a {@link #discoverPartitions()} call.
    + */
    +public abstract class AbstractPartitionDiscoverer {
    +
    +	/** Describes whether we are discovering partitions for fixed topics or a topic pattern. */
    +	private final KafkaTopicsDescriptor topicsDescriptor;
    +
    +	/** Index of the consumer subtask that this partition discoverer belongs to. */
    +	private final int indexOfThisSubtask;
    +
    +	/** The total number of consumer subtasks. */
    +	private final int numParallelSubtasks;
    +
    +	/** Flag to determine whether or not the discoverer is closed. */
    +	private volatile boolean closed = true;
    +
    +	/**
    +	 * Flag to determine whether or not the discoverer had been woken up.
    +	 * When set to {@code true}, {@link #discoverPartitions()} would be interrupted as early as possible.
    +	 * Once interrupted, the flag is reset.
    +	 */
    +	private volatile boolean wakeup;
    +
    +	/**
    +	 * Map of topics to they're largest discovered partition id seen by this subtask.
    +	 * This state may be updated whenever {@link AbstractPartitionDiscoverer#discoverPartitions()} or
    +	 * {@link AbstractPartitionDiscoverer#setAndCheckDiscoveredPartition(KafkaTopicPartition)} is called.
    +	 *
    +	 * This is used to remove old partitions from the fetched partition lists. It is sufficient
    +	 * to keep track of only the largest partition id because Kafka partition numbers are only
    +	 * allowed to be increased and has incremental ids.
    +	 */
    +	private final Map<String, Integer> topicsToLargestDiscoveredPartitionId;
    +
    +	public AbstractPartitionDiscoverer(
    +			KafkaTopicsDescriptor topicsDescriptor,
    +			int indexOfThisSubtask,
    +			int numParallelSubtasks) {
    +
    +		this.topicsDescriptor = checkNotNull(topicsDescriptor);
    +		this.indexOfThisSubtask = indexOfThisSubtask;
    +		this.numParallelSubtasks = numParallelSubtasks;
    +		this.topicsToLargestDiscoveredPartitionId = new HashMap<>();
    +	}
    +
    +	/**
    +	 * Opens the partition discoverer, initializing all required Kafka connections.
    +	 *
    +	 * <p>NOTE: thread-safety is not guaranteed.
    +	 */
    +	public void open() throws Exception {
    +		closed = false;
    +		initializeConnections();
    +	}
    +
    +	/**
    +	 * Closes the partition discoverer, cleaning up all Kafka connections.
    +	 *
    +	 * <p>NOTE: thread-safety is not guaranteed.
    +	 */
    +	public void close() throws Exception {
    +		closed = true;
    +		closeConnections();
    +	}
    +
    +	/**
    +	 * Interrupt an in-progress discovery attempt by throwing a {@link WakeupException}.
    +	 * If no attempt is in progress, the immediate next attempt will throw a {@link WakeupException}.
    +	 *
    +	 * <p>This method can be called concurrently from a different thread.
    +	 */
    +	public void wakeup() {
    +		wakeup = true;
    +		wakeupConnections();
    +	}
    +
    +	/**
    +	 * Execute a partition discovery attempt for this subtask.
    +	 * This method lets the partition discoverer update what partitions it has discovered so far.
    +	 *
    +	 * @return List of discovered new partitions that this subtask should subscribe to.
    +	 */
    +	public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, ClosedException {
    +		if (!closed && !wakeup) {
    +			try {
    +				List<KafkaTopicPartition> newDiscoveredPartitions;
    +
    +				// (1) get all possible partitions, based on whether we are subscribed to fixed topics or a topic patern
    --- End diff --
    
    Typo "patern"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---