You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tzulitai <gi...@git.apache.org> on 2017/07/11 17:11:07 UTC

[GitHub] flink pull request #4301: [FLINK-7143] [kafka] Fix indeterminate partition a...

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/4301

    [FLINK-7143] [kafka] Fix indeterminate partition assignment in FlinkKafkaConsumer

    This PR changes the mod operation for partition assignment from `i % numTasks == subtaskIndex` to `partition.hashCode % numTasks == subtaskIndex`.
    
    The bug was initially caused by #3378, when moving away from sorting the partition list. Apparently, the tests for partition assignment was not strict enough and did not catch this. This PR additionally adds verifications that the partitions end up in the expected subtasks, and that different partition ordering will still have the same partition assignments.
    
    Note: a fix is not required for the `master` branch, since the partition discovery changes already indirectly fixed the issue. However, test coverage for deterministic assignment should likewise be improved in `master` as well. A separate PR will be opened for that.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-7143-1.3

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4301.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4301
    
----
commit 563f605d00f5d184fce2eb505c59033f22d3d0ab
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Date:   2017-07-11T17:03:01Z

    [FLINK-7143] [kafka] Fix indeterminate partition assignment in FlinkKafkaConsumer

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4301: (release-1.3) [FLINK-7143] [kafka] Fix indeterminate part...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4301
  
    This was merged via #4357. Closing ..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4301: (release-1.3) [FLINK-7143] [kafka] Fix indeterminate part...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/4301
  
    Note, that this doesn't normally occur because the strategy for assigning Kafka partitions and for assigning operator state is the same (right now). However, this means that you will have active partition discovery for parallel instances that didn't previously have state: assume we have 1 partition and 1 parallel source. Now we add a new partition and restart the Flink job. Now, parallel instance 1 will still read from partition 0, parallel instance 2 will think that it didn't restart (because it didn't get state) and will discover partitions and take ownership of partition 1.
    
    (This is with current `release-1.3` branch code.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4301: (release-1.3) [FLINK-7143] [kafka] Fix indeterminate part...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4301
  
    This would then mean we discourage restoring from a 1.3.x savepoint, because the state is potentially incorrect.
    I wonder if we then actually want to always fetch partitions on startup (fresh or from savepoint) to deal with this (just a fix for the `release-1.3` branch)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4301: (release-1.3) [FLINK-7143] [kafka] Fix indeterminate part...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/4301
  
    I think that would fix the bug. There are two things I would like to improve, though:
    
      1. Relying on `hashCode()` makes very implicit assumptions about the behavior of the hash code implementation. This does not really document/articulate well how critical this `int` value that we rely on is. For example, by Java specification, hashCode may vary between processes - it only needs to be stable within a single JVM. Our hash code implementation happens to be stable currently, as long as the JDK does not change the implementation of the String hash code method (which they could in theory do in any minor release, although they have not done that in a while).
    
      2. It is crucial that the distribution of partitions is uniform. That is a bit harder to guarantee when all sources pick up their own set of topics. At the least, distribution should be uniform of the partitions within a topic. For example, the topic defines "where to start" in the parallel subtasks, and the partitions then go "round robin".
    Well, as it happens, this is actually the implementation of the hash code function, but again, this looks a bit like it "coincidentally" behaves like that, rather than that we have a strict contract for that behavior. For example, changing the hashCode from `31 * topic + partition` to `31 * partition + topic` results in non-uniform distribution, but is an equally valid hashCode.
    
    I would suggest to have a function `int assignmentIndex()` or so, for which we define the above contract. We should also have tests that this distributes partitions within a single topic uniform.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4301: (release-1.3) [FLINK-7143] [kafka] Fix indeterminate part...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/4301
  
    Yes, I don't think we can get around this when restoring from "old" state.
    
    I also have another suspicion: I don't think that `KafkaConsumerTestBase.runMultipleSourcesOnePartitionExactlyOnceTest()` accurately catches some cases and I think there is a problem that we cannot accurately detect whether we are restoring or whether we are opening from scratch. Consider this case: 5 partitions, 5 parallel source instances. Now we rescale to 10 parallel source instances. Some sources don't get state, so they think that we are starting from scratch and they will run partition discovery. Doesn't this mean that they could possibly read from a topic where already another source is reading from, because it got the state for that? (Not this doesn't occur on master because all sources get all state.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4301: (release-1.3) [FLINK-7143] [kafka] Fix indeterminate part...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4301
  
    Yes, that is true. This assignment logic is only applied on fresh starts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4301: (release-1.3) [FLINK-7143] [kafka] Fix indeterminate part...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/4301
  
    Quick comment for my own clarification: when restoring from a 1.3.x savepoint, the new assignment logic will not be used, right? In Flink 1.3.x there is no dynamic partition discovery and so when restarting from state we have to strictly stick to what we have in the (operator) state to avoid reading partitions on multiple subtasks.
    
    If this is true, this also means that folks that have savepoints on 1.3.x cannot them if they want to benefit from this fix, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4301: (release-1.3) [FLINK-7143] [kafka] Fix indeterminate part...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4301
  
    @StephanEwen
    Regarding no-rediscover on restore test:
    yes, could say that it is covered in `KafkaConsumerTestBase.runMultipleSourcesOnePartitionExactlyOnceTest()`. It's an end-to-end exactly-once test for the case where Flink source subtask count > partition count.
    
    Regarding `ListState`:
    The redistribution of `ListState` doesn't conflict with discovery and assignment of partitions in the `release-1.3` case (where there is no partition discovery), because we don't respect the partition assignment logic if we're starting from savepoints. We only consider what's in the restored state. See also @aljoscha's comment above.
    
    For `master` where partition discovery is already merged, the `ListState` is a union list state, where all subtasks are broadcasted with all partition states. On restore, the restored union list state is filtered again with the assignment logic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4301: (release-1.3) [FLINK-7143] [kafka] Fix indeterminate part...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/4301
  
    Oye, this is more complicated than I thought. On `release-1.3` the assignment actually works if the Kafka brokers always return the partitions in the same order. The reason is that the assignment of partitions and the assignment of operator state (in `RoundRobinOperatorStateRepartitioner`) is aligned. This meant that it's not a problem when sources think that they are "fresh" (not restored from state) because they didn't get any state. If they tried to assign a partition to themselves this would also mean that they have the state for that (again, because partition assignment and operator state assignment are aligned). 
    
    This PR breaks the alignment because the `startIndex` is not necessarily `0`. However, this is not caught by any tests because the `StateAssignmentOperation` has an optimisation where it doesn't repartition operator state if the parallelism doesn't change. If we deactivate that optimisation by turning this line into `if (true)`: https://github.com/apache/flink/blob/b1f762127234e323b947aa4a363935f87be1994f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L561-L561 the test in Kafka09ITCase will fail.
    
    The fix is to properly forward the information of whether we're restored in `initializeState()`, I did a commit for that: https://github.com/aljoscha/flink/tree/finish-pr-4301-kafka-13-fixings. The problem is that it is not easy to change the tests to catch this bug. I think an ITCase that uses Kafka and does a savepoint and rescaling would do the trick.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4301: (release-1.3) [FLINK-7143] [kafka] Fix indeterminate part...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4301
  
    @aljoscha on some second thinking, I don't think we can easily deal with the fact that, when restoring from 1.3.1 / 1.3.0 savepoints in 1.3.2, users will not benefit from this bug fix.
    
    There is basically no guarantee in what the distribution would be when restoring from 1.3.1 / 1.3.0, and therefore no way to manipulate it to follow the new fixed distribution scheme we introduce here.
    It may be possible if we force a migrate to union list state in 1.3.2, but I'm not really sure that we want to do that ..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4301: [FLINK-7143] [kafka] Fix indeterminate partition assignme...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4301
  
    R: @aljoscha @rmetzger 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4301: (release-1.3) [FLINK-7143] [kafka] Fix indetermina...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4301#discussion_r126962857
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java ---
    @@ -0,0 +1,56 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka.internals;
    +
    +/**
    + * Utility for assigning Kafka partitions to consumer subtasks.
    + */
    +public class KafkaTopicPartitionAssigner {
    +
    +	/**
    +	 * Returns the index of the target subtask that a specific Kafka partition should be
    +	 * assigned to.
    +	 *
    +	 * <p>The resulting distribution of partitions of a single topic has the following contract:
    +	 * <ul>
    +	 *     <li>1. Uniformly distributed across subtasks</li>
    +	 *     <li>2. Partitions are round-robin distributed (strictly clockwise w.r.t. ascending
    +	 *     subtask indices) by using the partition id as the offset from a starting index
    +	 *     (i.e., the index of the subtask which partition 0 of the topic will be assigned to,
    +	 *     determined using the topic name).</li>
    +	 * </ul>
    +	 *
    +	 * <p>The above contract is crucial and cannot be broken. Consumer subtasks rely on this
    +	 * contract to locally filter out partitions that it should not subscribe to, guaranteeing
    +	 * that all partitions of a single topic will always be assigned to some subtask in a
    +	 * uniformly distributed manner.
    +	 *
    +	 * @param partition the Kafka partition
    +	 * @param numParallelSubtasks total number of parallel subtasks
    +	 *
    +	 * @return index of the target subtask that the Kafka partition should be assigned to.
    +	 */
    +	public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {
    +		int startIndex = Math.abs(partition.getTopic().hashCode() * 31 % numParallelSubtasks);
    --- End diff --
    
    Good catch!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4301: (release-1.3) [FLINK-7143] [kafka] Fix indeterminate part...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4301
  
    @StephanEwen thanks for the review. Your suggestion makes a lot of sense.
    
    I've fixed this up as the following:
    
    - Have a new method `KafkaTopicAssigner.assign(KafkaTopicPartition partition, int numSubtasks)` that defines a strict contract, such that when locally used to filter out partitions, the resulting distribution of the partitions of a single topic are guaranteed to be:
      1. Uniformly distributed across subtasks
      2. Partitions are round-robin distributed (strictly CLOCKWISE w.r.t. ascending subtask indices) by using the partition id as the offset from a starting index determined using the topic name. The extra directional contract makes this more stricter than what we had before, which we may be round-robin assigning partitions counter-clockwise. This should make the actual assignment scheme much more predictable as perceived by the user, since they just need to know the start index of a specific topic to understand how the partitions of the topic are distributed across subtasks. We could add some log that states the start index of the topics it is consuming.
    
    - Strengthen the tests in `KafkaConsumerPartitionAssignmentTest` to test this contract. Uniform distribution was already tested in that suite of tests, the change makes the tests also verify the "clockwise round-robin since some start index" contract.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4301: (release-1.3) [FLINK-7143] [kafka] Fix indetermina...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai closed the pull request at:

    https://github.com/apache/flink/pull/4301


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4301: (release-1.3) [FLINK-7143] [kafka] Fix indetermina...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4301#discussion_r126952350
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java ---
    @@ -0,0 +1,56 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka.internals;
    +
    +/**
    + * Utility for assigning Kafka partitions to consumer subtasks.
    + */
    +public class KafkaTopicPartitionAssigner {
    +
    +	/**
    +	 * Returns the index of the target subtask that a specific Kafka partition should be
    +	 * assigned to.
    +	 *
    +	 * <p>The resulting distribution of partitions of a single topic has the following contract:
    +	 * <ul>
    +	 *     <li>1. Uniformly distributed across subtasks</li>
    +	 *     <li>2. Partitions are round-robin distributed (strictly clockwise w.r.t. ascending
    +	 *     subtask indices) by using the partition id as the offset from a starting index
    +	 *     (i.e., the index of the subtask which partition 0 of the topic will be assigned to,
    +	 *     determined using the topic name).</li>
    +	 * </ul>
    +	 *
    +	 * <p>The above contract is crucial and cannot be broken. Consumer subtasks rely on this
    +	 * contract to locally filter out partitions that it should not subscribe to, guaranteeing
    +	 * that all partitions of a single topic will always be assigned to some subtask in a
    +	 * uniformly distributed manner.
    +	 *
    +	 * @param partition the Kafka partition
    +	 * @param numParallelSubtasks total number of parallel subtasks
    +	 *
    +	 * @return index of the target subtask that the Kafka partition should be assigned to.
    +	 */
    +	public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {
    +		int startIndex = Math.abs(partition.getTopic().hashCode() * 31 % numParallelSubtasks);
    --- End diff --
    
    Minor detail: `Math.abs` does not work for `Integer.MIN_VALUE`, so it is slightly safe to do
    ```java
    int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks);
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4301: (release-1.3) [FLINK-7143] [kafka] Fix indeterminate part...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/4301
  
    Just to double-check: I see that the state of the partitions is in a `ListState`. That means after recovery, they can be differently distributed than before. Does that not conflict with the discovery and assignment of partitions?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4301: (release-1.3) [FLINK-7143] [kafka] Fix indeterminate part...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/4301
  
    Do we have a test for the case where there are fewer partitions than sources so that some sources do not get partitions on restore? To make sure they do not accidentally re-discover?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---