You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tzulitai <gi...@git.apache.org> on 2017/07/18 11:40:48 UTC

[GitHub] flink pull request #4357: (release-1.3) [FLINK-7143, FLINK-7195] Collection ...

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/4357

    (release-1.3) [FLINK-7143, FLINK-7195] Collection of Kafka fixes for release-1.3

    This PR subsumes #4344 and #4301, including changes in both PRs merged and conflicts resolved.
    Apparently, some new tests added in one of the PRs relies also on the fix of the other PR, so opening this one to have a better overall view of the status of the fixes.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink kafka-13-fixes

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4357.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4357
    
----
commit 919b23a6e1c650a3d08f5418a53e712e86d51506
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Date:   2017-07-11T17:03:01Z

    [FLINK-7143] [kafka] Fix indeterminate partition assignment in FlinkKafkaConsumer
    
    Apart from fixing the previous incorrect, indeterministic assignment
    logic, this commit also adds an explicitly defined method that properly
    states a strict contract for the assignment, instead of just relying on
    some hashCode implementation that doesn't convey this contract as well
    as the importance of the assignment's deterministic characteristic well.

commit 00bcdbf24c177276f203063f905886becfe23db5
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Date:   2017-07-14T11:51:03Z

    [FLINK-7195] [kafka] Remove partition list querying when restoring state in FlinkKafkaConsumer
    
    Previously, querying the partition list and using it to filter out
    restored partition states is problematic since the queried partition
    list may be missing partitions due to temporary downtime of Kafka
    brokers. Effectively, this caused the potential dropping of state on
    restores.
    
    This commit fixes this by completely removing partition querying if
    we're restoring state (as notified by
    FunctionInitializationContext.isRestored()). The subscribed partitions
    will always be exactly what the restored state contains.

commit a4ca2f559b1d530e68ce3516035964f569ff7c7f
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2017-07-17T17:06:09Z

    [FLINK-7143] [kafka] Fix detection of restored bit in Kafka Consumer
    
    Before, the problem was that empty state was associated with the source
    not being restored. However, a source can have empty restored state in
    one of two cases:
    
    1. The source was not restored.
    2. The overall job was restored but the source simply didn't get any
    operator state assigned.

commit faf957209220d2779062321d7ab58c9356906ad8
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2017-07-18T08:35:54Z

    [hotfix] [kafka] Make checkpoint methods final in KafkaConsumerBase
    
    This prevents concrete Kafka Source implementations from accidentally
    overriding the checkpointing methods. This would be problematic when not
    providing tests. We test the checkpoint methods of the ConsumerBase but
    derived methods would not be tested.

commit 5180f898c48ce2e416547dcdf76caef72c5a8dee
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2017-07-18T09:57:46Z

    [FLINK-7143] [kafka] Add test for Kafka Consumer rescaling
    
    This verifies that the consumer always correctly knows whether it is
    restored or not and is not affected by changes in the partitions as
    reported by Kafka.
    
    Previously, operator state reshuffling could lead to partitions being
    subscribed to multiple times.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4357: (release-1.3) [FLINK-7143, FLINK-7195] Collection of Kafk...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4357
  
    @aljoscha do you have any last comments on these changes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4357: (release-1.3) [FLINK-7143, FLINK-7195] Collection ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4357#discussion_r128147490
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -517,16 +519,13 @@ public void initializeState(FunctionInitializationContext context) throws Except
     					LOG.debug("Using the following offsets: {}", restoredState);
     				}
     			}
    -			if (restoredState != null && restoredState.isEmpty()) {
    -				restoredState = null;
    -			}
     		} else {
     			LOG.info("No restore state for FlinkKafkaConsumer.");
     		}
     	}
     
     	@Override
    -	public void snapshotState(FunctionSnapshotContext context) throws Exception {
    +	public final void snapshotState(FunctionSnapshotContext context) throws Exception {
    --- End diff --
    
    On the other hand, I think that this part of your description is strange:
    >we can't set Flink checkpoint to false, because otherwise Kafka consumer auto.commit will be hard-coded to true.
    This should not be the case (at least starting from Flink 1.3.x). The "auto.commit" is independent of checkpointing. If you don't enable checkpointing, "auto.commit" decides whether or not periodic checkpointing is used. Otherwise, you can still disable offset committing with checkpointing on by using `FlinkKafkaConsumer#disableOffsetCommittingOnCheckpoints`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4357: (release-1.3) [FLINK-7143, FLINK-7195] Collection ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4357#discussion_r128147542
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -517,16 +519,13 @@ public void initializeState(FunctionInitializationContext context) throws Except
     					LOG.debug("Using the following offsets: {}", restoredState);
     				}
     			}
    -			if (restoredState != null && restoredState.isEmpty()) {
    -				restoredState = null;
    -			}
     		} else {
     			LOG.info("No restore state for FlinkKafkaConsumer.");
     		}
     	}
     
     	@Override
    -	public void snapshotState(FunctionSnapshotContext context) throws Exception {
    +	public final void snapshotState(FunctionSnapshotContext context) throws Exception {
    --- End diff --
    
    See https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4357: (release-1.3) [FLINK-7143, FLINK-7195] Collection ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai closed the pull request at:

    https://github.com/apache/flink/pull/4357


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4357: (release-1.3) [FLINK-7143, FLINK-7195] Collection of Kafk...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/4357
  
    I reviewed this a last time with Stephan, offline, and we think we should go with this version. Overriding the snapshot methods is most likely not necessary anymore but if it is we will "un-finalize" them again before release.
    
    @tzulitai Do you want to have the honours of merging? 😃 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4357: (release-1.3) [FLINK-7143, FLINK-7195] Collection ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4357#discussion_r128755399
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -517,16 +519,13 @@ public void initializeState(FunctionInitializationContext context) throws Except
     					LOG.debug("Using the following offsets: {}", restoredState);
     				}
     			}
    -			if (restoredState != null && restoredState.isEmpty()) {
    -				restoredState = null;
    -			}
     		} else {
     			LOG.info("No restore state for FlinkKafkaConsumer.");
     		}
     	}
     
     	@Override
    -	public void snapshotState(FunctionSnapshotContext context) throws Exception {
    +	public final void snapshotState(FunctionSnapshotContext context) throws Exception {
    --- End diff --
    
    @aljoscha I think that's a good approach to avoid making the methods final for now. Would also be a good opportunity to clean up the `FlinkKafkaConsumerBaseTest` test a bit :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4357: (release-1.3) [FLINK-7143, FLINK-7195] Collection ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4357#discussion_r128147294
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -517,16 +519,13 @@ public void initializeState(FunctionInitializationContext context) throws Except
     					LOG.debug("Using the following offsets: {}", restoredState);
     				}
     			}
    -			if (restoredState != null && restoredState.isEmpty()) {
    -				restoredState = null;
    -			}
     		} else {
     			LOG.info("No restore state for FlinkKafkaConsumer.");
     		}
     	}
     
     	@Override
    -	public void snapshotState(FunctionSnapshotContext context) throws Exception {
    +	public final void snapshotState(FunctionSnapshotContext context) throws Exception {
    --- End diff --
    
    @stevenzwu the snapshotState method was actually never intended to be overriden, hence making it final here to state that clearly. For example, the verrsion-specific implementations for `FlinkKafkaConsumerBase` may override that and have incorrect implementations, where as our tests would never realize it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4357: (release-1.3) [FLINK-7143, FLINK-7195] Collection ...

Posted by stevenzwu <gi...@git.apache.org>.
Github user stevenzwu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4357#discussion_r128113797
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -517,16 +519,13 @@ public void initializeState(FunctionInitializationContext context) throws Except
     					LOG.debug("Using the following offsets: {}", restoredState);
     				}
     			}
    -			if (restoredState != null && restoredState.isEmpty()) {
    -				restoredState = null;
    -			}
     		} else {
     			LOG.info("No restore state for FlinkKafkaConsumer.");
     		}
     	}
     
     	@Override
    -	public void snapshotState(FunctionSnapshotContext context) throws Exception {
    +	public final void snapshotState(FunctionSnapshotContext context) throws Exception {
    --- End diff --
    
    @tzulitai what's the reason to make this final? In our router use case, we override the snapshotState method to no-op. We disabled Flink checkpoint by setting checkpoint interval to Long.MAX_VALUE. we can't set Flink checkpoint to false, because otherwise Kafka consumer auto.commit will be hard-coded to true. 
    
    @zhenzhongxu ^


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4357: (release-1.3) [FLINK-7143, FLINK-7195] Collection ...

Posted by stevenzwu <gi...@git.apache.org>.
Github user stevenzwu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4357#discussion_r128642186
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -517,16 +519,13 @@ public void initializeState(FunctionInitializationContext context) throws Except
     					LOG.debug("Using the following offsets: {}", restoredState);
     				}
     			}
    -			if (restoredState != null && restoredState.isEmpty()) {
    -				restoredState = null;
    -			}
     		} else {
     			LOG.info("No restore state for FlinkKafkaConsumer.");
     		}
     	}
     
     	@Override
    -	public void snapshotState(FunctionSnapshotContext context) throws Exception {
    +	public final void snapshotState(FunctionSnapshotContext context) throws Exception {
    --- End diff --
    
    @tzulitai looks like the behavior was changed/fix in 1.3. Here is the Kafka09Fetcher.java code from 1.2 that was causing the behavior I described earlier.
    
    {code}
    		// if checkpointing is enabled, we are not automatically committing to Kafka.
    		kafkaProperties.setProperty(
    				ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
    				Boolean.toString(!enableCheckpointing));
    {code}


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4357: (release-1.3) [FLINK-7143, FLINK-7195] Collection ...

Posted by stevenzwu <gi...@git.apache.org>.
Github user stevenzwu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4357#discussion_r128642917
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -517,16 +519,13 @@ public void initializeState(FunctionInitializationContext context) throws Except
     					LOG.debug("Using the following offsets: {}", restoredState);
     				}
     			}
    -			if (restoredState != null && restoredState.isEmpty()) {
    -				restoredState = null;
    -			}
     		} else {
     			LOG.info("No restore state for FlinkKafkaConsumer.");
     		}
     	}
     
     	@Override
    -	public void snapshotState(FunctionSnapshotContext context) throws Exception {
    +	public final void snapshotState(FunctionSnapshotContext context) throws Exception {
    --- End diff --
    
    ```
    the verrsion-specific implementations for FlinkKafkaConsumerBase may override that and have incorrect implementations, where as our tests would never realize it.
    ```
    @tzulitai why would this be a concern for FlinkKafkaConsumerBase. if version-specific implementations have bugs, they should have test to catch and prevent bugs. We do need the capability to override the snapshot method to no-op. what would be your suggested alternative?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4357: (release-1.3) [FLINK-7143, FLINK-7195] Collection ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4357#discussion_r128754182
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -517,16 +519,13 @@ public void initializeState(FunctionInitializationContext context) throws Except
     					LOG.debug("Using the following offsets: {}", restoredState);
     				}
     			}
    -			if (restoredState != null && restoredState.isEmpty()) {
    -				restoredState = null;
    -			}
     		} else {
     			LOG.info("No restore state for FlinkKafkaConsumer.");
     		}
     	}
     
     	@Override
    -	public void snapshotState(FunctionSnapshotContext context) throws Exception {
    +	public final void snapshotState(FunctionSnapshotContext context) throws Exception {
    --- End diff --
    
    The concern here is that not making the methods final makes it easy for contributors to accidentally override them. We don't have specific unit tests for the 0.9 `FlinkKafkaConsumer` or the 0.10 `FlinkKafkaConsumer` and only test the base `FlinkKafkaConsumerBase`. This is OK, as long as specific implementations don't override important methods. If the `FlinkKafkaConsumer090` did override the `snapshot()`/`restore()` methods, for example, no tests would catch this.
    
    @tzulitai I don't want to discuss here about these methods to much since we want to get the fixes in for release 1.3.2. A way around the problem is to turn the `FlinkKafkaConsumerBaseTest` into an abstract `FlinkKafkaConsumerBaseTestBase` that has an abstract method `createTestingConsumer(List<KafkaTopicPartition> mockFetchedPartitions)` that creates a "dummy" consumer for a specific Kafka version. Then we would have individual `FlinkKafkaConsumer09Test`, `FlinkKafkaConsumer010Test` and so on that derive form the abstract test base and just implement the method for creating the testing consumer.
    
    What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4357: (release-1.3) [FLINK-7143, FLINK-7195] Collection of Kafk...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/4357
  
    I don't but I would like to give @StephanEwen a chance to comment on the changes that make the checkpoint-related methods on the consumer base final.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4357: (release-1.3) [FLINK-7143, FLINK-7195] Collection of Kafk...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4357
  
    R: @aljoscha 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4357: (release-1.3) [FLINK-7143, FLINK-7195] Collection of Kafk...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4357
  
    @aljoscha sure, merging this now :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---