You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tzulitai <gi...@git.apache.org> on 2017/07/14 12:03:55 UTC

[GitHub] flink pull request #4344: (release-1.3) [FLINK-7195] [kafka] Remove partitio...

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/4344

    (release-1.3) [FLINK-7195] [kafka] Remove partition list querying when restoring state in FlinkKafkaConsumer

    This issue is a re-appearance of FLINK-6006. On restore, we should not respect any fetched partitions list from Kafka and perform any filtering of the restored partition states. There are corner cases where, due to Kafka broker downtime, some partitions may be missing in the fetched partition list. Therefore, we should not respect the fetched partitions list on restore time to manipulate the restored state, which may lead to broken state. To be more precise, we actually should not require fetching partitions on restore.
    
    We've stepped on our own foot again and reintroduced this bug in ed68fedbe90db03823d75a020510ad3c344fa73e. This PR adds proper unit tests for this that does not rely on the internal implementations and test only on public abstractions of `FlinkKafkaConsumerBase`.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-7195

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4344.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4344
    
----
commit 12af5d8b0e43b62935dc619258fb8f957b11d0bc
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Date:   2017-07-14T11:51:03Z

    [FLINK-7195] [kafka] Remove partition list querying when restoring state in FlinkKafkaConsumer

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4344: (release-1.3) [FLINK-7195] [kafka] Remove partition list ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4344
  
    @aljoscha the issue is that there may be missing partitions when querying partitions from Kafka (e.g., if some brokers are down).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4344: (release-1.3) [FLINK-7195] [kafka] Remove partition list ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/4344
  
    Ah, I thought you meant something else. Because this is pretty much the bug this PR is trying to solve, right? 😅 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4344: (release-1.3) [FLINK-7195] [kafka] Remove partition list ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/4344
  
    So partitions can be removed, in Kafka?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4344: (release-1.3) [FLINK-7195] [kafka] Remove partitio...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4344#discussion_r127951191
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java ---
    @@ -532,6 +533,107 @@ public void testSnapshotStateWithCommitOnCheckpointsDisabled() throws Exception
     		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
     	}
     
    +	@Test
    +	public void testRestoredStateInsensitiveToMissingPartitions() throws Exception {
    +		List<KafkaTopicPartition> mockFetchedPartitionsOnStartup = Arrays.asList(
    +			new KafkaTopicPartition("test-topic", 0),
    +			new KafkaTopicPartition("test-topic", 1),
    +			new KafkaTopicPartition("test-topic", 2));
    +
    +		// missing fetched partitions on restore
    +		List<KafkaTopicPartition> mockFetchedPartitionsOnRestore = mockFetchedPartitionsOnStartup.subList(0, 2);
    +
    +		testRestoredStateInsensitiveToFetchedPartitions(mockFetchedPartitionsOnStartup, mockFetchedPartitionsOnRestore);
    +	}
    +
    +	@Test
    +	public void testRestoredStateInsensitiveToNewPartitions() throws Exception {
    +		List<KafkaTopicPartition> mockFetchedPartitionsOnStartup = Arrays.asList(
    +			new KafkaTopicPartition("test-topic", 0),
    +			new KafkaTopicPartition("test-topic", 1),
    +			new KafkaTopicPartition("test-topic", 2));
    +
    +		// new partitions on restore
    +		List<KafkaTopicPartition> mockFetchedPartitionsOnRestore = new ArrayList<>(mockFetchedPartitionsOnStartup);
    --- End diff --
    
    I've addressed this and re-opened as #4357 which subsumes this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4344: (release-1.3) [FLINK-7195] [kafka] Remove partition list ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4344
  
    For this change, I think we also need to verify how the consumers behave when some restored partition is no longer reachable. (since previously, no longer reachable partitions will be filtered out on restore, but that has a bad side effect of dropping state).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4344: (release-1.3) [FLINK-7195] [kafka] Remove partitio...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai closed the pull request at:

    https://github.com/apache/flink/pull/4344


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4344: (release-1.3) [FLINK-7195] [kafka] Remove partitio...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4344#discussion_r127481937
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java ---
    @@ -532,6 +533,107 @@ public void testSnapshotStateWithCommitOnCheckpointsDisabled() throws Exception
     		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
     	}
     
    +	@Test
    +	public void testRestoredStateInsensitiveToMissingPartitions() throws Exception {
    +		List<KafkaTopicPartition> mockFetchedPartitionsOnStartup = Arrays.asList(
    +			new KafkaTopicPartition("test-topic", 0),
    +			new KafkaTopicPartition("test-topic", 1),
    +			new KafkaTopicPartition("test-topic", 2));
    +
    +		// missing fetched partitions on restore
    +		List<KafkaTopicPartition> mockFetchedPartitionsOnRestore = mockFetchedPartitionsOnStartup.subList(0, 2);
    +
    +		testRestoredStateInsensitiveToFetchedPartitions(mockFetchedPartitionsOnStartup, mockFetchedPartitionsOnRestore);
    +	}
    +
    +	@Test
    +	public void testRestoredStateInsensitiveToNewPartitions() throws Exception {
    +		List<KafkaTopicPartition> mockFetchedPartitionsOnStartup = Arrays.asList(
    +			new KafkaTopicPartition("test-topic", 0),
    +			new KafkaTopicPartition("test-topic", 1),
    +			new KafkaTopicPartition("test-topic", 2));
    +
    +		// new partitions on restore
    +		List<KafkaTopicPartition> mockFetchedPartitionsOnRestore = new ArrayList<>(mockFetchedPartitionsOnStartup);
    --- End diff --
    
    Why is this one not also using `Arrays.asList()`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4344: (release-1.3) [FLINK-7195] [kafka] Remove partitio...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4344#discussion_r127935505
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java ---
    @@ -532,6 +533,107 @@ public void testSnapshotStateWithCommitOnCheckpointsDisabled() throws Exception
     		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
     	}
     
    +	@Test
    +	public void testRestoredStateInsensitiveToMissingPartitions() throws Exception {
    +		List<KafkaTopicPartition> mockFetchedPartitionsOnStartup = Arrays.asList(
    +			new KafkaTopicPartition("test-topic", 0),
    +			new KafkaTopicPartition("test-topic", 1),
    +			new KafkaTopicPartition("test-topic", 2));
    +
    +		// missing fetched partitions on restore
    +		List<KafkaTopicPartition> mockFetchedPartitionsOnRestore = mockFetchedPartitionsOnStartup.subList(0, 2);
    +
    +		testRestoredStateInsensitiveToFetchedPartitions(mockFetchedPartitionsOnStartup, mockFetchedPartitionsOnRestore);
    +	}
    +
    +	@Test
    +	public void testRestoredStateInsensitiveToNewPartitions() throws Exception {
    +		List<KafkaTopicPartition> mockFetchedPartitionsOnStartup = Arrays.asList(
    +			new KafkaTopicPartition("test-topic", 0),
    +			new KafkaTopicPartition("test-topic", 1),
    +			new KafkaTopicPartition("test-topic", 2));
    +
    +		// new partitions on restore
    +		List<KafkaTopicPartition> mockFetchedPartitionsOnRestore = new ArrayList<>(mockFetchedPartitionsOnStartup);
    --- End diff --
    
    will change 👌 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4344: (release-1.3) [FLINK-7195] [kafka] Remove partition list ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/4344
  
    @tzulitai Could you close this PR since its subsumed by #4357?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---