You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by pnowojski <gi...@git.apache.org> on 2017/07/13 12:04:12 UTC

[GitHub] flink pull request #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2....

GitHub user pnowojski opened a pull request:

    https://github.com/apache/flink/pull/4321

    [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2.1

    This patch fixes also an incompatibility with the latest Kafka 0.10.x and 0.11.x kafka-clients.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/pnowojski/flink kafka010

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4321.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4321
    
----
commit e8aac4d3842c433ffc40e36c696950057e5139b9
Author: Piotr Nowojski <pi...@gmail.com>
Date:   2017-07-13T11:58:29Z

    [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2.1

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2....

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4321#discussion_r128180555
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java ---
    @@ -335,6 +356,9 @@ public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToC
     	 */
     	@VisibleForTesting
     	void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartitions) throws Exception {
    +		if (newPartitions.size() > 0) {
    +			hasAssignedPartitions = true;
    +		}
    --- End diff --
    
    Should we actually extend this `if` block to wrap the whole code in `reassignPartitions`? I.e., we shouldn't be doing the reassignment logic if `newPartitions.size() == 0`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2....

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4321#discussion_r128193424
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java ---
    @@ -744,17 +745,21 @@ void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartit
     			final OneShotLatch continueAssignmentLatch) {
     
     		final KafkaConsumer<byte[], byte[]> mockConsumer = mock(KafkaConsumer.class);
    +		final AtomicInteger callCounter = new AtomicInteger();
    +
     		when(mockConsumer.assignment()).thenAnswer(new Answer<Object>() {
     			@Override
     			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
    -				if (midAssignmentLatch != null) {
    -					midAssignmentLatch.trigger();
    -				}
    +				// first call is not the one that we want to catch... we all love mocks, don't we?
    --- End diff --
    
    This change is no longer needed once I dropped `this.hasAssignedPartitions = !consumer.assignment().isEmpty();` assignment (it was `the first call` that was causing the problems)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2.1

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:

    https://github.com/apache/flink/pull/4321
  
    I have added unit test for closing. I think it should be triggered/tested in one of the `ITCases`, but test is fairly easy so it shouldn't hurt us to have this tested explicitly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2.1

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/4321
  
    This patch looks good.
    
    As a minor comment: I would prefer to not have `hasAssignedPartitions` as a field, but rather return it from the `reassignPartitions()` method and have it only as a local variable in the `run()` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2.1

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:

    https://github.com/apache/flink/pull/4321
  
    Dropping this field would make it more error prone in the future if anyone would call `reassingPartitions()` from somewhere else. For me `hasAssignedPartitions` is tightly related to the state of the `consumer` field (in perfect world it should be exposed via `consumer`'s API...), thus also should be maintained as the class state.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2....

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4321#discussion_r128180697
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java ---
    @@ -744,17 +745,21 @@ void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartit
     			final OneShotLatch continueAssignmentLatch) {
     
     		final KafkaConsumer<byte[], byte[]> mockConsumer = mock(KafkaConsumer.class);
    +		final AtomicInteger callCounter = new AtomicInteger();
    +
     		when(mockConsumer.assignment()).thenAnswer(new Answer<Object>() {
     			@Override
     			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
    -				if (midAssignmentLatch != null) {
    -					midAssignmentLatch.trigger();
    -				}
    +				// first call is not the one that we want to catch... we all love mocks, don't we?
    --- End diff --
    
    Lets remove the last part about loving mocks ;-) I do understand your argument on mocking, though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2.1

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/4321
  
    I think the new pull request description template would have been awesome here ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2.1

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/4321
  
    Fair enough
     +1 then


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2....

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4321


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2....

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4321#discussion_r128181037
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java ---
    @@ -744,17 +745,21 @@ void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartit
     			final OneShotLatch continueAssignmentLatch) {
     
     		final KafkaConsumer<byte[], byte[]> mockConsumer = mock(KafkaConsumer.class);
    +		final AtomicInteger callCounter = new AtomicInteger();
    +
     		when(mockConsumer.assignment()).thenAnswer(new Answer<Object>() {
     			@Override
     			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
    -				if (midAssignmentLatch != null) {
    -					midAssignmentLatch.trigger();
    -				}
    +				// first call is not the one that we want to catch... we all love mocks, don't we?
    --- End diff --
    
    Could you explain a bit on "first call is not the one that we want to catch"? Which test was failing? I have the feeling that this could have been fixed in a different way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2.1

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:

    https://github.com/apache/flink/pull/4321
  
    @tzulitai could you look at this PR and particularly into last commit (fixup). I'm not a big fan of mocks and mockito based tests and I would really be inclined to just drop this test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2.1

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:

    https://github.com/apache/flink/pull/4321
  
    Good catch with with this spinning, I missed that.
    
    Checking per each iteration for assigned partitions is unfortunately costly, because there is no cheap `isEmpty()` method. The one that I have found `consumer.assignment()` is pretty costly (creates quite a lot of objects and takes some locks). I wouldn't want to call it very often.
    
    I could move this variable to local scope of `run()` function, but it would be a little bit more error prone (in case some refactoring and for example calling `reassignPartitions()` from somewhere else outside of the `run()` method).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2.1

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4321
  
    @pnowojski we can't just drop that test, IMO. It's crucial that those tests exist to guard against incorrect reassignment logic in the `KafkaConsumerThread`. Breaking that would mess up the shutdown responsiveness of the consumer thread.
    
    I'm not sure why your current fix would be bad, though. Or why do you want it to be removed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2.1

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/4321
  
    I think this pull request will make the Kafka consumer go into a hot busy waiting loop when it has no partitions assigned.
    
    I would suggest to do a blocking `take()` or so on the `unassignedPartitionsQueue`.
    
    Also, would be great to get around the instance variable, and simply check how many partitions are assigned on the KafkaConsumer, or pass this via a return value of the `reassignPartitions()` function.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2....

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4321#discussion_r128178914
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java ---
    @@ -210,14 +214,28 @@ public void run() {
     				}
     
     				try {
    -					newPartitions = unassignedPartitionsQueue.pollBatch();
    +					if (hasAssignedPartitions) {
    +						newPartitions = unassignedPartitionsQueue.pollBatch();
    +					}
    +					else {
    +						// if no assigned partitions block until we get at least one
    +						// instead of hot spinning this loop. We relay on a fact that
    --- End diff --
    
    nit: typo? relay -> rely


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2.1

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:

    https://github.com/apache/flink/pull/4321
  
    Hmm, will blocking operation be appropriate here? This would prevent `shutdown()` from actually breaking the loop. I think we would need some timeout here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2.1

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4321
  
    Looks good now, +1 on my side.
    Lets also wait a bit for @StephanEwen to see if he has any more comments regarding the use of an extra `hasAssignedPartitions` field (since he commented on that before).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2.1

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:

    https://github.com/apache/flink/pull/4321
  
    Thanks @tzulitai 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2.1

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:

    https://github.com/apache/flink/pull/4321
  
    I have also squashed previous fixups - there is only a new one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2....

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4321#discussion_r128179835
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java ---
    @@ -151,6 +154,7 @@ public void run() {
     		// including concurrent 'close()' calls.
     		try {
     			this.consumer = getConsumer(kafkaProperties);
    +			this.hasAssignedPartitions = !consumer.assignment().isEmpty();
    --- End diff --
    
    Can't we just start with `false` here?
    We'll only ever get partitions once we enter the main fetch loop.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2.1

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4321
  
    One other question: I need a bit more context on why the version bump requires that change in the `KafkaConsumerThread`. From what I perceive, that should be an separate issue to fix hot looping, no?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2.1

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:

    https://github.com/apache/flink/pull/4321
  
    -- IMO begin
    Mockito tests tends to repeat the implementation. Instead of testing for the effect, they tend to do the same thing as the actual code but in backwards. In other words, they have that much sense as writing the same feature/code twice and then comparing whether outcome is the same. It is valuable at first, because you make sure that you didn't make any mistakes. But after that, they make your live miserable, because so often changes in the actual code brakes them and you have to implement the same thing twice.
    
    Exactly like in this case. I added call `consumer.assignment()` call in the production code and then had to spend quite a bit of time understanding why some strange test deadlocked. To fix it, I had to implement the same change as in the production code in the mock.
    -- IMO ends
    
    If you have a different opinion we can leave it as it is :) It's not worth of arguing that much.
    
    There is a comment in the code, but sorry that I didn't state it more clearly in this PR itself:
    ```
    // Without assigned partitions KafkaConsumer.poll will throw an exception
    ```
    After version bump (and in Kafka 0.11), `KafkaConsumer.poll()` throws an `IllegalStateException` if it doesn't have assigned partitions. Thus we need skip this call in that case.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---