You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/09/15 23:27:21 UTC

[kafka] branch trunk updated: MINOR: Add unit tests for StreamsRebalanceListener (#9258)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f28713f  MINOR: Add unit tests for StreamsRebalanceListener (#9258)
f28713f is described below

commit f28713f92218f41d21d5149cdc6034fa374821ca
Author: Bruno Cadonna <br...@confluent.io>
AuthorDate: Wed Sep 16 01:25:19 2020 +0200

    MINOR: Add unit tests for StreamsRebalanceListener (#9258)
    
    Reviewers: Walker Carlson <wc...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../internals/StreamsRebalanceListenerTest.java    | 56 +++++++++++++++++++++-
 1 file changed, 54 insertions(+), 2 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.java
index cc81fe3..b8ccc94 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.java
@@ -16,13 +16,16 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.errors.MissingSourceTopicException;
 import org.apache.kafka.streams.processor.internals.StreamThread.State;
 import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
+import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -47,6 +50,13 @@ public class StreamsRebalanceListenerTest {
         assignmentErrorCode
     );
 
+    @Before
+    public void before() {
+        expect(streamThread.state()).andStubReturn(null);
+        expect(taskManager.activeTaskIds()).andStubReturn(null);
+        expect(taskManager.standbyTaskIds()).andStubReturn(null);
+    }
+
     @Test
     public void shouldThrowMissingSourceTopicException() {
         replay(taskManager, streamThread);
@@ -61,9 +71,9 @@ public class StreamsRebalanceListenerTest {
     }
 
     @Test
-    public void shouldHandleOnPartitionAssigned() {
+    public void shouldHandleAssignedPartitions() {
         taskManager.handleRebalanceComplete();
-        expect(streamThread.setState(State.PARTITIONS_ASSIGNED)).andStubReturn(null);
+        expect(streamThread.setState(State.PARTITIONS_ASSIGNED)).andReturn(State.RUNNING);
         replay(taskManager, streamThread);
         assignmentErrorCode.set(AssignorError.NONE.code());
 
@@ -71,4 +81,46 @@ public class StreamsRebalanceListenerTest {
 
         verify(taskManager, streamThread);
     }
+
+    @Test
+    public void shouldHandleRevokedPartitions() {
+        final Collection<TopicPartition> partitions = Collections.singletonList(new TopicPartition("topic", 0));
+        expect(streamThread.setState(State.PARTITIONS_REVOKED)).andReturn(State.RUNNING);
+        taskManager.handleRevocation(partitions);
+        replay(streamThread, taskManager);
+
+        streamsRebalanceListener.onPartitionsRevoked(partitions);
+
+        verify(taskManager, streamThread);
+    }
+
+    @Test
+    public void shouldNotHandleRevokedPartitionsIfStateCannotTransitToPartitionRevoked() {
+        expect(streamThread.setState(State.PARTITIONS_REVOKED)).andReturn(null);
+        replay(streamThread, taskManager);
+
+        streamsRebalanceListener.onPartitionsRevoked(Collections.singletonList(new TopicPartition("topic", 0)));
+
+        verify(taskManager, streamThread);
+    }
+
+    @Test
+    public void shouldNotHandleEmptySetOfRevokedPartitions() {
+        expect(streamThread.setState(State.PARTITIONS_REVOKED)).andReturn(State.RUNNING);
+        replay(streamThread, taskManager);
+
+        streamsRebalanceListener.onPartitionsRevoked(Collections.emptyList());
+
+        verify(taskManager, streamThread);
+    }
+
+    @Test
+    public void shouldHandleLostPartitions() {
+        taskManager.handleLostAll();
+        replay(streamThread, taskManager);
+
+        streamsRebalanceListener.onPartitionsLost(Collections.singletonList(new TopicPartition("topic", 0)));
+
+        verify(taskManager, streamThread);
+    }
 }
\ No newline at end of file