You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/05 22:39:24 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #8818: KAFKA-10086: Integration test for ensuring warmups are effective

vvcephei commented on a change in pull request #8818:
URL: https://github.com/apache/kafka/pull/8818#discussion_r436193560



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -1013,7 +1014,7 @@ private boolean addClientAssignments(final Map<String, Assignment> assignment,
             final List<TopicPartition> activePartitionsList = new ArrayList<>();
             final List<TaskId> assignedActiveList = new ArrayList<>();
 
-            final boolean tasksRevoked = populateActiveTaskAndPartitionsLists(
+            final Set<TaskId> activeTasksRemovedPendingRevokation = populateActiveTaskAndPartitionsLists(

Review comment:
       Bug 1: we can't remove active tasks in the cooperative algorithm, because this causes their state to get discarded (definitely for in-memory stores, and maybe for persistent ones, depending on the state cleaner).
   
   Instead, we convert them to standbys so they can keep warm.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -86,7 +87,7 @@
     private boolean rebalanceInProgress = false;  // if we are in the middle of a rebalance, it is not safe to commit
 
     // includes assigned & initialized tasks and unassigned tasks we locked temporarily during rebalance
-    private Set<TaskId> lockedTaskDirectories = new HashSet<>();
+    private final Set<TaskId> lockedTaskDirectories = new HashSet<>();

Review comment:
       just noticed this.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##########
@@ -1634,16 +1636,15 @@ public void shouldReturnInterleavedAssignmentWithUnrevokedPartitionsRemovedWhenN
 
         // The new consumer's assignment should be empty until c1 has the chance to revoke its partitions/tasks
         assertThat(assignment.get(CONSUMER_2).partitions(), equalTo(emptyList()));
-        assertThat(
-            AssignmentInfo.decode(assignment.get(CONSUMER_2).userData()),
-            equalTo(new AssignmentInfo(
-                LATEST_SUPPORTED_VERSION,
-                emptyList(),
-                emptyMap(),
-                emptyMap(),
-                emptyMap(),
-                0
-            )));
+
+        final AssignmentInfo actualAssignment = AssignmentInfo.decode(assignment.get(CONSUMER_2).userData());
+        assertThat(actualAssignment.version(), is(LATEST_SUPPORTED_VERSION));
+        assertThat(actualAssignment.activeTasks(), empty());
+        // Note we're not asserting anything about standbys. If the assignor gave an active task to CONSUMER_2, it would
+        // be converted to a standby, but we don't know whether the assignor will do that.

Review comment:
       The fix for Bug 1 necessitated this test change.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
##########
@@ -75,21 +101,23 @@ public void shouldProperlyConfigureTheAssignor() throws NoSuchFieldException, Il
 
         final String testId = safeUniqueTestName(getClass(), testName);
         final String appId = "appId_" + testId;
+        final String inputTopic = "input" + testId;
 
-        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, "input");
+        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);

Review comment:
       Just a few cleanups in this first test.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
##########
@@ -142,4 +169,184 @@ public void shouldProperlyConfigureTheAssignor() throws NoSuchFieldException, Il
             assertThat(taskAssignor, instanceOf(MyTaskAssignor.class));
         }
     }
+
+    @Test
+    public void shouldScaleOutWithWarmupTasksAndInMemoryStores() throws InterruptedException {

Review comment:
       Here are the new tests, one for in-memory and one for persistent. We expect exactly the same behavior for both.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -514,17 +515,24 @@ void handleLostAll() {
 
     /**
      * Compute the offset total summed across all stores in a task. Includes offset sum for any tasks we own the
-     * lock for, which includes assigned and unassigned tasks we locked in {@link #tryToLockAllNonEmptyTaskDirectories()}
-     *
-     * @return Map from task id to its total offset summed across all state stores
+     * lock for, which includes assigned and unassigned tasks we locked in {@link #tryToLockAllNonEmptyTaskDirectories()}.
+     * Does not include stateless or non-logged tasks.
      */
     public Map<TaskId, Long> getTaskOffsetSums() {
         final Map<TaskId, Long> taskOffsetSums = new HashMap<>();
 
-        for (final TaskId id : lockedTaskDirectories) {
+        // Not all tasks will create directories, and there may be directories for tasks we don't currently own,
+        // so we consider all tasks that are either owned or on disk. This includes stateless tasks, which should
+        // just have an empty changelogOffsets map.
+        for (final TaskId id : union(HashSet::new, lockedTaskDirectories, tasks.keySet())) {

Review comment:
       Bug 2: tasks with only in-memory stores don't get task directories, so we were skipping them here. See the comment.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -605,13 +613,14 @@ private long sumOfChangelogOffsets(final TaskId id, final Map<TopicPartition, Lo
             final long offset = changelogEntry.getValue();
 
 
-            if (offset == Task.LATEST_OFFSET) { // this condition can only be true for active tasks; never for standby
+            if (offset == Task.LATEST_OFFSET) {
+                // this condition can only be true for active tasks; never for standby
                 // for this case, the offset of all partitions is set to `LATEST_OFFSET`
                 // and we "forward" the sentinel value directly
                 return Task.LATEST_OFFSET;
             } else {
                 if (offset < 0) {
-                    throw new IllegalStateException("Offset should not be negative.");
+                    throw new IllegalStateException("Expected not to get a sentinel offset, but got: " + changelogEntry);

Review comment:
       I independently implemented this check before Matthias's PR, and preferred this message.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org