You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/05/30 16:16:44 UTC

[kafka] branch 2.1 updated: KAFKA-8187: Add wait time for other thread in the same jvm to free the locks (#6818)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 7151251  KAFKA-8187: Add wait time for other thread in the same jvm to free the locks (#6818)
7151251 is described below

commit 715125160a49633de21007fb863c978cbca93938
Author: Lifei Chen <li...@allseeingsecurity.com>
AuthorDate: Thu May 30 23:33:37 2019 +0800

    KAFKA-8187: Add wait time for other thread in the same jvm to free the locks (#6818)
    
    Fix KAFKA-8187: State store record loss across multiple reassignments when using standby tasks.
    Do not let the thread to transit to RUNNING until all tasks (including standby tasks) are ready.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>,  Bill Bejeck <bb...@gmail.com>
---
 .../streams/processor/internals/TaskManager.java   |   2 +-
 .../processor/internals/TaskManagerTest.java       | 110 ++++++++++++---------
 2 files changed, 65 insertions(+), 47 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 9cc5a19..191968c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -327,7 +327,7 @@ public class TaskManager {
             log.trace("Resuming partitions {}", assignment);
             consumer.resume(assignment);
             assignStandbyPartitions();
-            return true;
+            return standby.allTasksRunning();
         }
         return false;
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index b0e7fce..dd1934a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -275,7 +275,7 @@ public class TaskManagerTest {
     @Test
     public void shouldAddNonResumedActiveTasks() {
         mockSingleActiveTask();
-        EasyMock.expect(active.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(false);
+        expect(active.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(false);
         active.addNewTask(EasyMock.same(streamTask));
         replay();
 
@@ -288,7 +288,7 @@ public class TaskManagerTest {
     @Test
     public void shouldNotAddResumedActiveTasks() {
         checkOrder(active, true);
-        EasyMock.expect(active.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(true);
+        expect(active.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(true);
         replay();
 
         taskManager.setAssignmentMetadata(taskId0Assignment, Collections.<TaskId, Set<TopicPartition>>emptyMap());
@@ -301,7 +301,7 @@ public class TaskManagerTest {
     @Test
     public void shouldAddNonResumedStandbyTasks() {
         mockStandbyTaskExpectations();
-        EasyMock.expect(standby.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(false);
+        expect(standby.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(false);
         standby.addNewTask(EasyMock.same(standbyTask));
         replay();
 
@@ -314,7 +314,7 @@ public class TaskManagerTest {
     @Test
     public void shouldNotAddResumedStandbyTasks() {
         checkOrder(active, true);
-        EasyMock.expect(standby.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(true);
+        expect(standby.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(true);
         replay();
 
         taskManager.setAssignmentMetadata(Collections.<TaskId, Set<TopicPartition>>emptyMap(), taskId0Assignment);
@@ -328,7 +328,7 @@ public class TaskManagerTest {
     public void shouldPauseActivePartitions() {
         mockSingleActiveTask();
         consumer.pause(taskId0Partitions);
-        EasyMock.expectLastCall();
+        expectLastCall();
         replay();
 
         taskManager.setAssignmentMetadata(taskId0Assignment, Collections.<TaskId, Set<TopicPartition>>emptyMap());
@@ -338,7 +338,7 @@ public class TaskManagerTest {
 
     @Test
     public void shouldSuspendActiveTasks() {
-        EasyMock.expect(active.suspend()).andReturn(null);
+        expect(active.suspend()).andReturn(null);
         replay();
 
         taskManager.suspendTasksAndState();
@@ -347,7 +347,7 @@ public class TaskManagerTest {
 
     @Test
     public void shouldSuspendStandbyTasks() {
-        EasyMock.expect(standby.suspend()).andReturn(null);
+        expect(standby.suspend()).andReturn(null);
         replay();
 
         taskManager.suspendTasksAndState();
@@ -357,7 +357,7 @@ public class TaskManagerTest {
     @Test
     public void shouldUnassignChangelogPartitionsOnSuspend() {
         restoreConsumer.unsubscribe();
-        EasyMock.expectLastCall();
+        expectLastCall();
         replay();
 
         taskManager.suspendTasksAndState();
@@ -366,9 +366,9 @@ public class TaskManagerTest {
 
     @Test
     public void shouldThrowStreamsExceptionAtEndIfExceptionDuringSuspend() {
-        EasyMock.expect(active.suspend()).andReturn(new RuntimeException(""));
-        EasyMock.expect(standby.suspend()).andReturn(new RuntimeException(""));
-        EasyMock.expectLastCall();
+        expect(active.suspend()).andReturn(new RuntimeException(""));
+        expect(standby.suspend()).andReturn(new RuntimeException(""));
+        expectLastCall();
         restoreConsumer.unsubscribe();
 
         replay();
@@ -384,7 +384,7 @@ public class TaskManagerTest {
     @Test
     public void shouldCloseActiveTasksOnShutdown() {
         active.close(true);
-        EasyMock.expectLastCall();
+        expectLastCall();
         replay();
 
         taskManager.shutdown(true);
@@ -394,7 +394,7 @@ public class TaskManagerTest {
     @Test
     public void shouldCloseStandbyTasksOnShutdown() {
         standby.close(false);
-        EasyMock.expectLastCall();
+        expectLastCall();
         replay();
 
         taskManager.shutdown(false);
@@ -404,7 +404,7 @@ public class TaskManagerTest {
     @Test
     public void shouldUnassignChangelogPartitionsOnShutdown() {
         restoreConsumer.unsubscribe();
-        EasyMock.expectLastCall();
+        expectLastCall();
         replay();
 
         taskManager.shutdown(true);
@@ -414,7 +414,7 @@ public class TaskManagerTest {
     @Test
     public void shouldInitializeNewActiveTasks() {
         active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject());
-        EasyMock.expectLastCall();
+        expectLastCall();
         replay();
 
         taskManager.updateNewAndRestoringTasks();
@@ -424,7 +424,7 @@ public class TaskManagerTest {
     @Test
     public void shouldInitializeNewStandbyTasks() {
         active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject());
-        EasyMock.expectLastCall();
+        expectLastCall();
         replay();
 
         taskManager.updateNewAndRestoringTasks();
@@ -433,9 +433,9 @@ public class TaskManagerTest {
 
     @Test
     public void shouldRestoreStateFromChangeLogReader() {
-        EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions);
+        expect(changeLogReader.restore(active)).andReturn(taskId0Partitions);
         active.updateRestored(taskId0Partitions);
-        EasyMock.expectLastCall();
+        expectLastCall();
         replay();
 
         taskManager.updateNewAndRestoringTasks();
@@ -444,13 +444,13 @@ public class TaskManagerTest {
 
     @Test
     public void shouldResumeRestoredPartitions() {
-        EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions);
-        EasyMock.expect(active.allTasksRunning()).andReturn(true);
-        EasyMock.expect(consumer.assignment()).andReturn(taskId0Partitions);
-        EasyMock.expect(standby.running()).andReturn(Collections.<StandbyTask>emptySet());
+        expect(changeLogReader.restore(active)).andReturn(taskId0Partitions);
+        expect(active.allTasksRunning()).andReturn(true);
+        expect(consumer.assignment()).andReturn(taskId0Partitions);
+        expect(standby.running()).andReturn(Collections.<StandbyTask>emptySet());
 
         consumer.resume(taskId0Partitions);
-        EasyMock.expectLastCall();
+        expectLastCall();
         replay();
 
         taskManager.updateNewAndRestoringTasks();
@@ -462,13 +462,31 @@ public class TaskManagerTest {
         mockAssignStandbyPartitions(1L);
         replay();
 
-        assertTrue(taskManager.updateNewAndRestoringTasks());
+        taskManager.updateNewAndRestoringTasks();
         verify(restoreConsumer);
     }
 
     @Test
+    public void shouldReturnTrueWhenActiveAndStandbyTasksAreRunning() {
+        mockAssignStandbyPartitions(1L);
+        expect(standby.allTasksRunning()).andReturn(true);
+        replay();
+
+        assertTrue(taskManager.updateNewAndRestoringTasks());
+    }
+
+    @Test
+    public void shouldReturnFalseWhenOnlyActiveTasksAreRunning() {
+        mockAssignStandbyPartitions(1L);
+        expect(standby.allTasksRunning()).andReturn(false);
+        replay();
+
+        assertFalse(taskManager.updateNewAndRestoringTasks());
+    }
+
+    @Test
     public void shouldReturnFalseWhenThereAreStillNonRunningTasks() {
-        EasyMock.expect(active.allTasksRunning()).andReturn(false);
+        expect(active.allTasksRunning()).andReturn(false);
         replay();
 
         assertFalse(taskManager.updateNewAndRestoringTasks());
@@ -478,7 +496,7 @@ public class TaskManagerTest {
     public void shouldSeekToCheckpointedOffsetOnStandbyPartitionsWhenOffsetGreaterThanEqualTo0() {
         mockAssignStandbyPartitions(1L);
         restoreConsumer.seek(t1p0, 1L);
-        EasyMock.expectLastCall();
+        expectLastCall();
         replay();
 
         taskManager.updateNewAndRestoringTasks();
@@ -489,7 +507,7 @@ public class TaskManagerTest {
     public void shouldSeekToBeginningIfOffsetIsLessThan0() {
         mockAssignStandbyPartitions(-1L);
         restoreConsumer.seekToBeginning(taskId0Partitions);
-        EasyMock.expectLastCall();
+        expectLastCall();
         replay();
 
         taskManager.updateNewAndRestoringTasks();
@@ -498,8 +516,8 @@ public class TaskManagerTest {
 
     @Test
     public void shouldCommitActiveAndStandbyTasks() {
-        EasyMock.expect(active.commit()).andReturn(1);
-        EasyMock.expect(standby.commit()).andReturn(2);
+        expect(active.commit()).andReturn(1);
+        expect(standby.commit()).andReturn(2);
 
         replay();
 
@@ -512,7 +530,7 @@ public class TaskManagerTest {
         // upgrade to strict mock to ensure no calls
         checkOrder(standby, true);
         active.commit();
-        EasyMock.expectLastCall().andThrow(new RuntimeException(""));
+        expectLastCall().andThrow(new RuntimeException(""));
         replay();
 
         try {
@@ -526,7 +544,7 @@ public class TaskManagerTest {
 
     @Test
     public void shouldPropagateExceptionFromStandbyCommit() {
-        EasyMock.expect(standby.commit()).andThrow(new RuntimeException(""));
+        expect(standby.commit()).andThrow(new RuntimeException(""));
         replay();
 
         try {
@@ -546,8 +564,8 @@ public class TaskManagerTest {
 
         futureDeletedRecords.complete(null);
 
-        EasyMock.expect(active.recordsToDelete()).andReturn(Collections.singletonMap(t1p1, 5L)).times(2);
-        EasyMock.expect(adminClient.deleteRecords(recordsToDelete)).andReturn(deleteRecordsResult).times(2);
+        expect(active.recordsToDelete()).andReturn(Collections.singletonMap(t1p1, 5L)).times(2);
+        expect(adminClient.deleteRecords(recordsToDelete)).andReturn(deleteRecordsResult).times(2);
         replay();
 
         taskManager.maybePurgeCommitedRecords();
@@ -561,8 +579,8 @@ public class TaskManagerTest {
         final Map<TopicPartition, RecordsToDelete> recordsToDelete = Collections.singletonMap(t1p1, RecordsToDelete.beforeOffset(5L));
         final DeleteRecordsResult deleteRecordsResult = new DeleteRecordsResult(Collections.singletonMap(t1p1, futureDeletedRecords));
 
-        EasyMock.expect(active.recordsToDelete()).andReturn(Collections.singletonMap(t1p1, 5L)).once();
-        EasyMock.expect(adminClient.deleteRecords(recordsToDelete)).andReturn(deleteRecordsResult).once();
+        expect(active.recordsToDelete()).andReturn(Collections.singletonMap(t1p1, 5L)).once();
+        expect(adminClient.deleteRecords(recordsToDelete)).andReturn(deleteRecordsResult).once();
         replay();
 
         taskManager.maybePurgeCommitedRecords();
@@ -579,8 +597,8 @@ public class TaskManagerTest {
 
         futureDeletedRecords.completeExceptionally(new Exception("KABOOM!"));
 
-        EasyMock.expect(active.recordsToDelete()).andReturn(Collections.singletonMap(t1p1, 5L)).times(2);
-        EasyMock.expect(adminClient.deleteRecords(recordsToDelete)).andReturn(deleteRecordsResult).times(2);
+        expect(active.recordsToDelete()).andReturn(Collections.singletonMap(t1p1, 5L)).times(2);
+        expect(adminClient.deleteRecords(recordsToDelete)).andReturn(deleteRecordsResult).times(2);
         replay();
 
         taskManager.maybePurgeCommitedRecords();
@@ -590,7 +608,7 @@ public class TaskManagerTest {
 
     @Test
     public void shouldMaybeCommitActiveTasks() {
-        EasyMock.expect(active.maybeCommitPerUserRequested()).andReturn(5);
+        expect(active.maybeCommitPerUserRequested()).andReturn(5);
         replay();
 
         assertThat(taskManager.maybeCommitActiveTasksPerUserRequested(), equalTo(5));
@@ -599,7 +617,7 @@ public class TaskManagerTest {
 
     @Test
     public void shouldProcessActiveTasks() {
-        EasyMock.expect(active.process(0L)).andReturn(10);
+        expect(active.process(0L)).andReturn(10);
         replay();
 
         assertThat(taskManager.process(0L), equalTo(10));
@@ -608,7 +626,7 @@ public class TaskManagerTest {
 
     @Test
     public void shouldPunctuateActiveTasks() {
-        EasyMock.expect(active.punctuate()).andReturn(20);
+        expect(active.punctuate()).andReturn(20);
         replay();
 
         assertThat(taskManager.punctuate(), equalTo(20));
@@ -617,8 +635,8 @@ public class TaskManagerTest {
 
     @Test
     public void shouldNotResumeConsumptionUntilAllStoresRestored() {
-        EasyMock.expect(active.allTasksRunning()).andReturn(false);
-        final Consumer<byte[], byte[]> consumer = (Consumer<byte[], byte[]>) EasyMock.createStrictMock(Consumer.class);
+        expect(active.allTasksRunning()).andReturn(false);
+        final Consumer<byte[], byte[]> consumer = EasyMock.createStrictMock(Consumer.class);
         taskManager.setConsumer(consumer);
         EasyMock.replay(active, consumer);
 
@@ -649,12 +667,12 @@ public class TaskManagerTest {
 
     private void mockAssignStandbyPartitions(final long offset) {
         final StandbyTask task = EasyMock.createNiceMock(StandbyTask.class);
-        EasyMock.expect(active.allTasksRunning()).andReturn(true);
-        EasyMock.expect(standby.running()).andReturn(Collections.singletonList(task));
-        EasyMock.expect(task.checkpointedOffsets()).andReturn(Collections.singletonMap(t1p0, offset));
+        expect(active.allTasksRunning()).andReturn(true);
+        expect(standby.running()).andReturn(Collections.singletonList(task));
+        expect(task.checkpointedOffsets()).andReturn(Collections.singletonMap(t1p0, offset));
         restoreConsumer.assign(taskId0Partitions);
 
-        EasyMock.expectLastCall();
+        expectLastCall();
         EasyMock.replay(task);
     }