You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/05/29 13:45:31 UTC

[kafka] branch 2.6 updated: MINOR: regression test for task assignor config (#8743)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 5892e9a  MINOR: regression test for task assignor config (#8743)
5892e9a is described below

commit 5892e9ab46f155eb1dff0b18ede48d6b0f5690a2
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Thu May 28 14:41:07 2020 -0500

    MINOR: regression test for task assignor config (#8743)
    
    Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>
---
 .../integration/TaskAssignorIntegrationTest.java       | 18 +++++++++++++++++-
 1 file changed, 17 insertions(+), 1 deletion(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
index 329e7bd..8aa5133 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
@@ -26,6 +26,8 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
 import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
+import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
 import org.apache.kafka.test.IntegrationTest;
 import org.junit.ClassRule;
 import org.junit.Rule;
@@ -37,12 +39,14 @@ import java.lang.reflect.Field;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.sameInstance;
 
@@ -54,6 +58,10 @@ public class TaskAssignorIntegrationTest {
     @Rule
     public TestName testName = new TestName();
 
+    // Just a dummy implementation so we can check the config
+    public static final class MyTaskAssignor extends HighAvailabilityTaskAssignor implements TaskAssignor {
+    }
+
     @SuppressWarnings("unchecked")
     @Test
     public void shouldProperlyConfigureTheAssignor() throws NoSuchFieldException, IllegalAccessException {
@@ -87,7 +95,8 @@ public class TaskAssignorIntegrationTest {
                 mkEntry(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, "6"),
                 mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "7"),
                 mkEntry(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, "480000"),
-                mkEntry(StreamsConfig.InternalConfig.ASSIGNMENT_LISTENER, configuredAssignmentListener)
+                mkEntry(StreamsConfig.InternalConfig.ASSIGNMENT_LISTENER, configuredAssignmentListener),
+                mkEntry(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, MyTaskAssignor.class.getName())
             )
         );
 
@@ -121,11 +130,18 @@ public class TaskAssignorIntegrationTest {
                 (AssignorConfiguration.AssignmentListener) assignmentListenerField.get(streamsPartitionAssignor);
 
 
+            final Field taskAssignorSupplierField = StreamsPartitionAssignor.class.getDeclaredField("taskAssignorSupplier");
+            taskAssignorSupplierField.setAccessible(true);
+            final Supplier<TaskAssignor> taskAssignorSupplier =
+                (Supplier<TaskAssignor>) taskAssignorSupplierField.get(streamsPartitionAssignor);
+            final TaskAssignor taskAssignor = taskAssignorSupplier.get();
+
             assertThat(configs.numStandbyReplicas, is(5));
             assertThat(configs.acceptableRecoveryLag, is(6L));
             assertThat(configs.maxWarmupReplicas, is(7));
             assertThat(configs.probingRebalanceIntervalMs, is(480000L));
             assertThat(actualAssignmentListener, sameInstance(configuredAssignmentListener));
+            assertThat(taskAssignor, instanceOf(MyTaskAssignor.class));
         }
     }
 }