You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/09/15 23:27:21 UTC
[kafka] branch trunk updated: MINOR: Add unit tests for
StreamsRebalanceListener (#9258)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f28713f MINOR: Add unit tests for StreamsRebalanceListener (#9258)
f28713f is described below
commit f28713f92218f41d21d5149cdc6034fa374821ca
Author: Bruno Cadonna <br...@confluent.io>
AuthorDate: Wed Sep 16 01:25:19 2020 +0200
MINOR: Add unit tests for StreamsRebalanceListener (#9258)
Reviewers: Walker Carlson <wc...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
.../internals/StreamsRebalanceListenerTest.java | 56 +++++++++++++++++++++-
1 file changed, 54 insertions(+), 2 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.java
index cc81fe3..b8ccc94 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.java
@@ -16,13 +16,16 @@
*/
package org.apache.kafka.streams.processor.internals;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.errors.MissingSourceTopicException;
import org.apache.kafka.streams.processor.internals.StreamThread.State;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
+import org.junit.Before;
import org.junit.Test;
import org.slf4j.LoggerFactory;
+import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
@@ -47,6 +50,13 @@ public class StreamsRebalanceListenerTest {
assignmentErrorCode
);
+ @Before
+ public void before() {
+ expect(streamThread.state()).andStubReturn(null);
+ expect(taskManager.activeTaskIds()).andStubReturn(null);
+ expect(taskManager.standbyTaskIds()).andStubReturn(null);
+ }
+
@Test
public void shouldThrowMissingSourceTopicException() {
replay(taskManager, streamThread);
@@ -61,9 +71,9 @@ public class StreamsRebalanceListenerTest {
}
@Test
- public void shouldHandleOnPartitionAssigned() {
+ public void shouldHandleAssignedPartitions() {
taskManager.handleRebalanceComplete();
- expect(streamThread.setState(State.PARTITIONS_ASSIGNED)).andStubReturn(null);
+ expect(streamThread.setState(State.PARTITIONS_ASSIGNED)).andReturn(State.RUNNING);
replay(taskManager, streamThread);
assignmentErrorCode.set(AssignorError.NONE.code());
@@ -71,4 +81,46 @@ public class StreamsRebalanceListenerTest {
verify(taskManager, streamThread);
}
+
+ @Test
+ public void shouldHandleRevokedPartitions() {
+ final Collection<TopicPartition> partitions = Collections.singletonList(new TopicPartition("topic", 0));
+ expect(streamThread.setState(State.PARTITIONS_REVOKED)).andReturn(State.RUNNING);
+ taskManager.handleRevocation(partitions);
+ replay(streamThread, taskManager);
+
+ streamsRebalanceListener.onPartitionsRevoked(partitions);
+
+ verify(taskManager, streamThread);
+ }
+
+ @Test
+ public void shouldNotHandleRevokedPartitionsIfStateCannotTransitToPartitionRevoked() {
+ expect(streamThread.setState(State.PARTITIONS_REVOKED)).andReturn(null);
+ replay(streamThread, taskManager);
+
+ streamsRebalanceListener.onPartitionsRevoked(Collections.singletonList(new TopicPartition("topic", 0)));
+
+ verify(taskManager, streamThread);
+ }
+
+ @Test
+ public void shouldNotHandleEmptySetOfRevokedPartitions() {
+ expect(streamThread.setState(State.PARTITIONS_REVOKED)).andReturn(State.RUNNING);
+ replay(streamThread, taskManager);
+
+ streamsRebalanceListener.onPartitionsRevoked(Collections.emptyList());
+
+ verify(taskManager, streamThread);
+ }
+
+ @Test
+ public void shouldHandleLostPartitions() {
+ taskManager.handleLostAll();
+ replay(streamThread, taskManager);
+
+ streamsRebalanceListener.onPartitionsLost(Collections.singletonList(new TopicPartition("topic", 0)));
+
+ verify(taskManager, streamThread);
+ }
}
\ No newline at end of file