You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tzulitai <gi...@git.apache.org> on 2018/01/12 02:14:06 UTC

[GitHub] flink pull request #5284: [FLINK-8306] [kafka, tests] Fix invalid mock verif...

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/5284

    [FLINK-8306] [kafka, tests] Fix invalid mock verifications on final method

    ## What is the purpose of the change
    
    This is a reworked version of #5200.
    Instead of introducing a new interface while we are still unsure of how the refactoring of the Kafka consumer should head towards, this version is a simple fix to have proper mocks in the `FlinkKafkaConsumerBase`.
    
    Only the last two commits are relevant (PR is based on #5188).
    
    ## Brief change log
    
    - ca1aa00 Remove invalid mock verifications, and introduce a proper `MockFetcher` mock.
    - d08727c is a hotfix that corrects a stale comment on a no-longer existing behaviour
    
    ## Verifying this change
    
    This change is a code cleanup, and is already covered by existing tests.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / **no**)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
      - The S3 file system connector: (yes / **no** / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes / **no**)
      - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-8306-reworked

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5284.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5284
    
----
commit 735972332ff25623898b25f210b7277aafbc7351
Author: Tzu-Li (Gordon) Tai <tz...@...>
Date:   2017-12-20T00:10:44Z

    [FLINK-8296] [kafka] Rework FlinkKafkaConsumerBaseTest to not rely on Java reflection
    
    Reflection was mainly used to inject mocks into private fields of the
    FlinkKafkaConsumerBase, without the need to fully execute all operator
    life cycle methods. This, however, caused the unit tests to be too
    implementation-specific.
    
    This commit reworks the FlinkKafkaConsumerBaseTest to remove test
    consumer instantiation methods that rely on reflection for dependency
    injection. All tests now instantiate dummy test consumers normally, and
    let all tests properly execute all operator life cycle methods
    regardless of the tested logic.

commit ca1aa0074c0ae4ff8e2da4f8d87b8378c2413ef5
Author: Tzu-Li (Gordon) Tai <tz...@...>
Date:   2018-01-12T00:45:32Z

    [FLINK-8306] [kafka, tests] Fix mock verifications on final method
    
    Previously, offset commit behavioural tests relied on verifying on
    AbstractFetcher::commitInternalOffsetsToKafka(). That method is actually
    final, and could not be mocked.
    
    This commit fixes that by implementing a proper mock AbstractFetcher,
    which keeps track of the offset commits that go through.

commit d08727c4f8816e408dabc12a673ad24593b27023
Author: Tzu-Li (Gordon) Tai <tz...@...>
Date:   2017-12-20T19:54:40Z

    [hotfix] [kafka] Remove stale comment on publishing procedures of AbstractFetcher
    
    The previous comment mentioned "only now will the fetcher return at
    least the restored offsets when calling snapshotCurrentState()". This is
    a remnant of the previous fetcher initialization behaviour, where in the
    past the fetcher wasn't directly seeded with restored offsets on
    instantiation.
    
    Since this is no longer true, this commit fixes the stale comment to
    avoid confusion.

----


---

[GitHub] flink pull request #5284: [FLINK-8306] [kafka, tests] Fix invalid mock verif...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5284


---

[GitHub] flink issue #5284: [FLINK-8306] [kafka, tests] Fix invalid mock verification...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/5284
  
    Yes, I've fixed that checkstyle on my local pre-merge branch 👌 


---

[GitHub] flink pull request #5284: [FLINK-8306] [kafka, tests] Fix invalid mock verif...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5284#discussion_r161176765
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java ---
    @@ -265,10 +274,8 @@ public void testSnapshotStateWithCommitOnCheckpointsEnabled() throws Exception {
     
     		// --------------------------------------------------------------------
     
    -		final OneShotLatch runLatch = new OneShotLatch();
    -		final OneShotLatch stopLatch = new OneShotLatch();
    -		final AbstractFetcher<String, ?> fetcher = getRunnableMockFetcher(runLatch, stopLatch);
    -		when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, state3);
    +		final MockFetcher<String> fetcher = spy(new MockFetcher<>());
    +		doReturn(state1).doReturn(state2).doReturn(state3).when(fetcher).snapshotCurrentState();
    --- End diff --
    
    Ops, haven't noticed that :)


---

[GitHub] flink pull request #5284: [FLINK-8306] [kafka, tests] Fix invalid mock verif...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5284#discussion_r161159244
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java ---
    @@ -265,10 +274,8 @@ public void testSnapshotStateWithCommitOnCheckpointsEnabled() throws Exception {
     
     		// --------------------------------------------------------------------
     
    -		final OneShotLatch runLatch = new OneShotLatch();
    -		final OneShotLatch stopLatch = new OneShotLatch();
    -		final AbstractFetcher<String, ?> fetcher = getRunnableMockFetcher(runLatch, stopLatch);
    -		when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, state3);
    +		final MockFetcher<String> fetcher = spy(new MockFetcher<>());
    +		doReturn(state1).doReturn(state2).doReturn(state3).when(fetcher).snapshotCurrentState();
    --- End diff --
    
    Maybe we could go one small step further?
    
    ```
    private static class MockFetcher<T> ... {
      private final ArrayDeque<HashMap<KafkaTopicPartition, Long>> stateSnapshotsToReturn = new ArrayDeque<>();
      
      public MockFetcher(HashMap<KafkaTopicPartition, Long>.. stateSnapshotsToReturn) {
        this.stateSnapshotsToReturn.addAll(Arrays.asList(stateSnapshotsToReturn));
      }
    
      @Override
      public HashMap<KafkaTopicPartition, Long> snapshotCurrentState() {
        checkState(!stateSnapshotsToReturn.isEmpty());
        return stateSnapshotsToReturn.poll();
      }
    }
    ```


---

[GitHub] flink pull request #5284: [FLINK-8306] [kafka, tests] Fix invalid mock verif...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5284#discussion_r161160136
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java ---
    @@ -265,10 +274,8 @@ public void testSnapshotStateWithCommitOnCheckpointsEnabled() throws Exception {
     
     		// --------------------------------------------------------------------
     
    -		final OneShotLatch runLatch = new OneShotLatch();
    -		final OneShotLatch stopLatch = new OneShotLatch();
    -		final AbstractFetcher<String, ?> fetcher = getRunnableMockFetcher(runLatch, stopLatch);
    -		when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, state3);
    +		final MockFetcher<String> fetcher = spy(new MockFetcher<>());
    +		doReturn(state1).doReturn(state2).doReturn(state3).when(fetcher).snapshotCurrentState();
    --- End diff --
    
    Yes, if we really want to get rid of Mockito for good :) 
    
    Though `checkState(!stateSnapshotsToReturn.isEmpty())` is a bit problematic, because the unit tests rely on the last return stub to re-occur for all remaining calls.
    
    I'll change this while merging.


---