You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/05/30 15:58:45 UTC

[kafka] branch 2.2 updated: KAFKA-8187: Add wait time for other thread in the same jvm to free the locks (#6818)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new ed659fe  KAFKA-8187: Add wait time for other thread in the same jvm to free the locks (#6818)
ed659fe is described below

commit ed659fe73dd652eeb57bba4618ce0054fcadf390
Author: Lifei Chen <li...@allseeingsecurity.com>
AuthorDate: Thu May 30 23:33:37 2019 +0800

    KAFKA-8187: Add wait time for other thread in the same jvm to free the locks (#6818)
    
    Fix KAFKA-8187: State store record loss across multiple reassignments when using standby tasks.
    Do not let the thread to transit to RUNNING until all tasks (including standby tasks) are ready.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>,  Bill Bejeck <bb...@gmail.com>
---
 .../streams/processor/internals/TaskManager.java   |   2 +-
 .../processor/internals/TaskManagerTest.java       | 108 ++++++++++++---------
 2 files changed, 64 insertions(+), 46 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 455d226..86a8e49 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -334,7 +334,7 @@ public class TaskManager {
             log.trace("Resuming partitions {}", assignment);
             consumer.resume(assignment);
             assignStandbyPartitions();
-            return true;
+            return standby.allTasksRunning();
         }
         return false;
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index f71d7a1..fcf275b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -263,7 +263,7 @@ public class TaskManagerTest {
     @Test
     public void shouldAddNonResumedActiveTasks() {
         mockSingleActiveTask();
-        EasyMock.expect(active.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(false);
+        expect(active.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(false);
         active.addNewTask(EasyMock.same(streamTask));
         replay();
 
@@ -276,7 +276,7 @@ public class TaskManagerTest {
     @Test
     public void shouldNotAddResumedActiveTasks() {
         checkOrder(active, true);
-        EasyMock.expect(active.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(true);
+        expect(active.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(true);
         replay();
 
         taskManager.setAssignmentMetadata(taskId0Assignment, Collections.<TaskId, Set<TopicPartition>>emptyMap());
@@ -289,7 +289,7 @@ public class TaskManagerTest {
     @Test
     public void shouldAddNonResumedStandbyTasks() {
         mockStandbyTaskExpectations();
-        EasyMock.expect(standby.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(false);
+        expect(standby.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(false);
         standby.addNewTask(EasyMock.same(standbyTask));
         replay();
 
@@ -302,7 +302,7 @@ public class TaskManagerTest {
     @Test
     public void shouldNotAddResumedStandbyTasks() {
         checkOrder(active, true);
-        EasyMock.expect(standby.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(true);
+        expect(standby.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(true);
         replay();
 
         taskManager.setAssignmentMetadata(Collections.<TaskId, Set<TopicPartition>>emptyMap(), taskId0Assignment);
@@ -316,7 +316,7 @@ public class TaskManagerTest {
     public void shouldPauseActivePartitions() {
         mockSingleActiveTask();
         consumer.pause(taskId0Partitions);
-        EasyMock.expectLastCall();
+        expectLastCall();
         replay();
 
         taskManager.setAssignmentMetadata(taskId0Assignment, Collections.<TaskId, Set<TopicPartition>>emptyMap());
@@ -326,7 +326,7 @@ public class TaskManagerTest {
 
     @Test
     public void shouldSuspendActiveTasks() {
-        EasyMock.expect(active.suspend()).andReturn(null);
+        expect(active.suspend()).andReturn(null);
         replay();
 
         taskManager.suspendTasksAndState();
@@ -335,7 +335,7 @@ public class TaskManagerTest {
 
     @Test
     public void shouldSuspendStandbyTasks() {
-        EasyMock.expect(standby.suspend()).andReturn(null);
+        expect(standby.suspend()).andReturn(null);
         replay();
 
         taskManager.suspendTasksAndState();
@@ -345,7 +345,7 @@ public class TaskManagerTest {
     @Test
     public void shouldUnassignChangelogPartitionsOnSuspend() {
         restoreConsumer.unsubscribe();
-        EasyMock.expectLastCall();
+        expectLastCall();
         replay();
 
         taskManager.suspendTasksAndState();
@@ -354,9 +354,9 @@ public class TaskManagerTest {
 
     @Test
     public void shouldThrowStreamsExceptionAtEndIfExceptionDuringSuspend() {
-        EasyMock.expect(active.suspend()).andReturn(new RuntimeException(""));
-        EasyMock.expect(standby.suspend()).andReturn(new RuntimeException(""));
-        EasyMock.expectLastCall();
+        expect(active.suspend()).andReturn(new RuntimeException(""));
+        expect(standby.suspend()).andReturn(new RuntimeException(""));
+        expectLastCall();
         restoreConsumer.unsubscribe();
 
         replay();
@@ -372,7 +372,7 @@ public class TaskManagerTest {
     @Test
     public void shouldCloseActiveTasksOnShutdown() {
         active.close(true);
-        EasyMock.expectLastCall();
+        expectLastCall();
         replay();
 
         taskManager.shutdown(true);
@@ -382,7 +382,7 @@ public class TaskManagerTest {
     @Test
     public void shouldCloseStandbyTasksOnShutdown() {
         standby.close(false);
-        EasyMock.expectLastCall();
+        expectLastCall();
         replay();
 
         taskManager.shutdown(false);
@@ -392,7 +392,7 @@ public class TaskManagerTest {
     @Test
     public void shouldUnassignChangelogPartitionsOnShutdown() {
         restoreConsumer.unsubscribe();
-        EasyMock.expectLastCall();
+        expectLastCall();
         replay();
 
         taskManager.shutdown(true);
@@ -402,7 +402,7 @@ public class TaskManagerTest {
     @Test
     public void shouldInitializeNewActiveTasks() {
         active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject());
-        EasyMock.expectLastCall();
+        expectLastCall();
         replay();
 
         taskManager.updateNewAndRestoringTasks();
@@ -412,7 +412,7 @@ public class TaskManagerTest {
     @Test
     public void shouldInitializeNewStandbyTasks() {
         active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject());
-        EasyMock.expectLastCall();
+        expectLastCall();
         replay();
 
         taskManager.updateNewAndRestoringTasks();
@@ -421,9 +421,9 @@ public class TaskManagerTest {
 
     @Test
     public void shouldRestoreStateFromChangeLogReader() {
-        EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions);
+        expect(changeLogReader.restore(active)).andReturn(taskId0Partitions);
         active.updateRestored(taskId0Partitions);
-        EasyMock.expectLastCall();
+        expectLastCall();
         replay();
 
         taskManager.updateNewAndRestoringTasks();
@@ -432,13 +432,13 @@ public class TaskManagerTest {
 
     @Test
     public void shouldResumeRestoredPartitions() {
-        EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions);
-        EasyMock.expect(active.allTasksRunning()).andReturn(true);
-        EasyMock.expect(consumer.assignment()).andReturn(taskId0Partitions);
-        EasyMock.expect(standby.running()).andReturn(Collections.<StandbyTask>emptySet());
+        expect(changeLogReader.restore(active)).andReturn(taskId0Partitions);
+        expect(active.allTasksRunning()).andReturn(true);
+        expect(consumer.assignment()).andReturn(taskId0Partitions);
+        expect(standby.running()).andReturn(Collections.<StandbyTask>emptySet());
 
         consumer.resume(taskId0Partitions);
-        EasyMock.expectLastCall();
+        expectLastCall();
         replay();
 
         taskManager.updateNewAndRestoringTasks();
@@ -450,13 +450,31 @@ public class TaskManagerTest {
         mockAssignStandbyPartitions(1L);
         replay();
 
-        assertTrue(taskManager.updateNewAndRestoringTasks());
+        taskManager.updateNewAndRestoringTasks();
         verify(restoreConsumer);
     }
 
     @Test
+    public void shouldReturnTrueWhenActiveAndStandbyTasksAreRunning() {
+        mockAssignStandbyPartitions(1L);
+        expect(standby.allTasksRunning()).andReturn(true);
+        replay();
+
+        assertTrue(taskManager.updateNewAndRestoringTasks());
+    }
+
+    @Test
+    public void shouldReturnFalseWhenOnlyActiveTasksAreRunning() {
+        mockAssignStandbyPartitions(1L);
+        expect(standby.allTasksRunning()).andReturn(false);
+        replay();
+
+        assertFalse(taskManager.updateNewAndRestoringTasks());
+    }
+
+    @Test
     public void shouldReturnFalseWhenThereAreStillNonRunningTasks() {
-        EasyMock.expect(active.allTasksRunning()).andReturn(false);
+        expect(active.allTasksRunning()).andReturn(false);
         replay();
 
         assertFalse(taskManager.updateNewAndRestoringTasks());
@@ -466,7 +484,7 @@ public class TaskManagerTest {
     public void shouldSeekToCheckpointedOffsetOnStandbyPartitionsWhenOffsetGreaterThanEqualTo0() {
         mockAssignStandbyPartitions(1L);
         restoreConsumer.seek(t1p0, 1L);
-        EasyMock.expectLastCall();
+        expectLastCall();
         replay();
 
         taskManager.updateNewAndRestoringTasks();
@@ -477,7 +495,7 @@ public class TaskManagerTest {
     public void shouldSeekToBeginningIfOffsetIsLessThan0() {
         mockAssignStandbyPartitions(-1L);
         restoreConsumer.seekToBeginning(taskId0Partitions);
-        EasyMock.expectLastCall();
+        expectLastCall();
         replay();
 
         taskManager.updateNewAndRestoringTasks();
@@ -486,8 +504,8 @@ public class TaskManagerTest {
 
     @Test
     public void shouldCommitActiveAndStandbyTasks() {
-        EasyMock.expect(active.commit()).andReturn(1);
-        EasyMock.expect(standby.commit()).andReturn(2);
+        expect(active.commit()).andReturn(1);
+        expect(standby.commit()).andReturn(2);
 
         replay();
 
@@ -500,7 +518,7 @@ public class TaskManagerTest {
         // upgrade to strict mock to ensure no calls
         checkOrder(standby, true);
         active.commit();
-        EasyMock.expectLastCall().andThrow(new RuntimeException(""));
+        expectLastCall().andThrow(new RuntimeException(""));
         replay();
 
         try {
@@ -514,7 +532,7 @@ public class TaskManagerTest {
 
     @Test
     public void shouldPropagateExceptionFromStandbyCommit() {
-        EasyMock.expect(standby.commit()).andThrow(new RuntimeException(""));
+        expect(standby.commit()).andThrow(new RuntimeException(""));
         replay();
 
         try {
@@ -534,8 +552,8 @@ public class TaskManagerTest {
 
         futureDeletedRecords.complete(null);
 
-        EasyMock.expect(active.recordsToDelete()).andReturn(Collections.singletonMap(t1p1, 5L)).times(2);
-        EasyMock.expect(adminClient.deleteRecords(recordsToDelete)).andReturn(deleteRecordsResult).times(2);
+        expect(active.recordsToDelete()).andReturn(Collections.singletonMap(t1p1, 5L)).times(2);
+        expect(adminClient.deleteRecords(recordsToDelete)).andReturn(deleteRecordsResult).times(2);
         replay();
 
         taskManager.maybePurgeCommitedRecords();
@@ -549,8 +567,8 @@ public class TaskManagerTest {
         final Map<TopicPartition, RecordsToDelete> recordsToDelete = Collections.singletonMap(t1p1, RecordsToDelete.beforeOffset(5L));
         final DeleteRecordsResult deleteRecordsResult = new DeleteRecordsResult(Collections.singletonMap(t1p1, futureDeletedRecords));
 
-        EasyMock.expect(active.recordsToDelete()).andReturn(Collections.singletonMap(t1p1, 5L)).once();
-        EasyMock.expect(adminClient.deleteRecords(recordsToDelete)).andReturn(deleteRecordsResult).once();
+        expect(active.recordsToDelete()).andReturn(Collections.singletonMap(t1p1, 5L)).once();
+        expect(adminClient.deleteRecords(recordsToDelete)).andReturn(deleteRecordsResult).once();
         replay();
 
         taskManager.maybePurgeCommitedRecords();
@@ -567,8 +585,8 @@ public class TaskManagerTest {
 
         futureDeletedRecords.completeExceptionally(new Exception("KABOOM!"));
 
-        EasyMock.expect(active.recordsToDelete()).andReturn(Collections.singletonMap(t1p1, 5L)).times(2);
-        EasyMock.expect(adminClient.deleteRecords(recordsToDelete)).andReturn(deleteRecordsResult).times(2);
+        expect(active.recordsToDelete()).andReturn(Collections.singletonMap(t1p1, 5L)).times(2);
+        expect(adminClient.deleteRecords(recordsToDelete)).andReturn(deleteRecordsResult).times(2);
         replay();
 
         taskManager.maybePurgeCommitedRecords();
@@ -578,7 +596,7 @@ public class TaskManagerTest {
 
     @Test
     public void shouldMaybeCommitActiveTasks() {
-        EasyMock.expect(active.maybeCommitPerUserRequested()).andReturn(5);
+        expect(active.maybeCommitPerUserRequested()).andReturn(5);
         replay();
 
         assertThat(taskManager.maybeCommitActiveTasksPerUserRequested(), equalTo(5));
@@ -587,7 +605,7 @@ public class TaskManagerTest {
 
     @Test
     public void shouldProcessActiveTasks() {
-        EasyMock.expect(active.process(0L)).andReturn(10);
+        expect(active.process(0L)).andReturn(10);
         replay();
 
         assertThat(taskManager.process(0L), equalTo(10));
@@ -596,7 +614,7 @@ public class TaskManagerTest {
 
     @Test
     public void shouldPunctuateActiveTasks() {
-        EasyMock.expect(active.punctuate()).andReturn(20);
+        expect(active.punctuate()).andReturn(20);
         replay();
 
         assertThat(taskManager.punctuate(), equalTo(20));
@@ -605,7 +623,7 @@ public class TaskManagerTest {
 
     @Test
     public void shouldNotResumeConsumptionUntilAllStoresRestored() {
-        EasyMock.expect(active.allTasksRunning()).andReturn(false);
+        expect(active.allTasksRunning()).andReturn(false);
         final Consumer<byte[], byte[]> consumer = EasyMock.createStrictMock(Consumer.class);
         taskManager.setConsumer(consumer);
         EasyMock.replay(active, consumer);
@@ -637,12 +655,12 @@ public class TaskManagerTest {
 
     private void mockAssignStandbyPartitions(final long offset) {
         final StandbyTask task = EasyMock.createNiceMock(StandbyTask.class);
-        EasyMock.expect(active.allTasksRunning()).andReturn(true);
-        EasyMock.expect(standby.running()).andReturn(Collections.singletonList(task));
-        EasyMock.expect(task.checkpointedOffsets()).andReturn(Collections.singletonMap(t1p0, offset));
+        expect(active.allTasksRunning()).andReturn(true);
+        expect(standby.running()).andReturn(Collections.singletonList(task));
+        expect(task.checkpointedOffsets()).andReturn(Collections.singletonMap(t1p0, offset));
         restoreConsumer.assign(taskId0Partitions);
 
-        EasyMock.expectLastCall();
+        expectLastCall();
         EasyMock.replay(task);
     }