You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/05/30 16:16:44 UTC
[kafka] branch 2.1 updated: KAFKA-8187: Add wait time for other
thread in the same jvm to free the locks (#6818)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 7151251 KAFKA-8187: Add wait time for other thread in the same jvm to free the locks (#6818)
7151251 is described below
commit 715125160a49633de21007fb863c978cbca93938
Author: Lifei Chen <li...@allseeingsecurity.com>
AuthorDate: Thu May 30 23:33:37 2019 +0800
KAFKA-8187: Add wait time for other thread in the same jvm to free the locks (#6818)
Fix KAFKA-8187: State store record loss across multiple reassignments when using standby tasks.
Do not let the thread to transit to RUNNING until all tasks (including standby tasks) are ready.
Reviewers: Guozhang Wang <wa...@gmail.com>, Bill Bejeck <bb...@gmail.com>
---
.../streams/processor/internals/TaskManager.java | 2 +-
.../processor/internals/TaskManagerTest.java | 110 ++++++++++++---------
2 files changed, 65 insertions(+), 47 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 9cc5a19..191968c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -327,7 +327,7 @@ public class TaskManager {
log.trace("Resuming partitions {}", assignment);
consumer.resume(assignment);
assignStandbyPartitions();
- return true;
+ return standby.allTasksRunning();
}
return false;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index b0e7fce..dd1934a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -275,7 +275,7 @@ public class TaskManagerTest {
@Test
public void shouldAddNonResumedActiveTasks() {
mockSingleActiveTask();
- EasyMock.expect(active.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(false);
+ expect(active.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(false);
active.addNewTask(EasyMock.same(streamTask));
replay();
@@ -288,7 +288,7 @@ public class TaskManagerTest {
@Test
public void shouldNotAddResumedActiveTasks() {
checkOrder(active, true);
- EasyMock.expect(active.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(true);
+ expect(active.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(true);
replay();
taskManager.setAssignmentMetadata(taskId0Assignment, Collections.<TaskId, Set<TopicPartition>>emptyMap());
@@ -301,7 +301,7 @@ public class TaskManagerTest {
@Test
public void shouldAddNonResumedStandbyTasks() {
mockStandbyTaskExpectations();
- EasyMock.expect(standby.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(false);
+ expect(standby.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(false);
standby.addNewTask(EasyMock.same(standbyTask));
replay();
@@ -314,7 +314,7 @@ public class TaskManagerTest {
@Test
public void shouldNotAddResumedStandbyTasks() {
checkOrder(active, true);
- EasyMock.expect(standby.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(true);
+ expect(standby.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(true);
replay();
taskManager.setAssignmentMetadata(Collections.<TaskId, Set<TopicPartition>>emptyMap(), taskId0Assignment);
@@ -328,7 +328,7 @@ public class TaskManagerTest {
public void shouldPauseActivePartitions() {
mockSingleActiveTask();
consumer.pause(taskId0Partitions);
- EasyMock.expectLastCall();
+ expectLastCall();
replay();
taskManager.setAssignmentMetadata(taskId0Assignment, Collections.<TaskId, Set<TopicPartition>>emptyMap());
@@ -338,7 +338,7 @@ public class TaskManagerTest {
@Test
public void shouldSuspendActiveTasks() {
- EasyMock.expect(active.suspend()).andReturn(null);
+ expect(active.suspend()).andReturn(null);
replay();
taskManager.suspendTasksAndState();
@@ -347,7 +347,7 @@ public class TaskManagerTest {
@Test
public void shouldSuspendStandbyTasks() {
- EasyMock.expect(standby.suspend()).andReturn(null);
+ expect(standby.suspend()).andReturn(null);
replay();
taskManager.suspendTasksAndState();
@@ -357,7 +357,7 @@ public class TaskManagerTest {
@Test
public void shouldUnassignChangelogPartitionsOnSuspend() {
restoreConsumer.unsubscribe();
- EasyMock.expectLastCall();
+ expectLastCall();
replay();
taskManager.suspendTasksAndState();
@@ -366,9 +366,9 @@ public class TaskManagerTest {
@Test
public void shouldThrowStreamsExceptionAtEndIfExceptionDuringSuspend() {
- EasyMock.expect(active.suspend()).andReturn(new RuntimeException(""));
- EasyMock.expect(standby.suspend()).andReturn(new RuntimeException(""));
- EasyMock.expectLastCall();
+ expect(active.suspend()).andReturn(new RuntimeException(""));
+ expect(standby.suspend()).andReturn(new RuntimeException(""));
+ expectLastCall();
restoreConsumer.unsubscribe();
replay();
@@ -384,7 +384,7 @@ public class TaskManagerTest {
@Test
public void shouldCloseActiveTasksOnShutdown() {
active.close(true);
- EasyMock.expectLastCall();
+ expectLastCall();
replay();
taskManager.shutdown(true);
@@ -394,7 +394,7 @@ public class TaskManagerTest {
@Test
public void shouldCloseStandbyTasksOnShutdown() {
standby.close(false);
- EasyMock.expectLastCall();
+ expectLastCall();
replay();
taskManager.shutdown(false);
@@ -404,7 +404,7 @@ public class TaskManagerTest {
@Test
public void shouldUnassignChangelogPartitionsOnShutdown() {
restoreConsumer.unsubscribe();
- EasyMock.expectLastCall();
+ expectLastCall();
replay();
taskManager.shutdown(true);
@@ -414,7 +414,7 @@ public class TaskManagerTest {
@Test
public void shouldInitializeNewActiveTasks() {
active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject());
- EasyMock.expectLastCall();
+ expectLastCall();
replay();
taskManager.updateNewAndRestoringTasks();
@@ -424,7 +424,7 @@ public class TaskManagerTest {
@Test
public void shouldInitializeNewStandbyTasks() {
active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject());
- EasyMock.expectLastCall();
+ expectLastCall();
replay();
taskManager.updateNewAndRestoringTasks();
@@ -433,9 +433,9 @@ public class TaskManagerTest {
@Test
public void shouldRestoreStateFromChangeLogReader() {
- EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions);
+ expect(changeLogReader.restore(active)).andReturn(taskId0Partitions);
active.updateRestored(taskId0Partitions);
- EasyMock.expectLastCall();
+ expectLastCall();
replay();
taskManager.updateNewAndRestoringTasks();
@@ -444,13 +444,13 @@ public class TaskManagerTest {
@Test
public void shouldResumeRestoredPartitions() {
- EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions);
- EasyMock.expect(active.allTasksRunning()).andReturn(true);
- EasyMock.expect(consumer.assignment()).andReturn(taskId0Partitions);
- EasyMock.expect(standby.running()).andReturn(Collections.<StandbyTask>emptySet());
+ expect(changeLogReader.restore(active)).andReturn(taskId0Partitions);
+ expect(active.allTasksRunning()).andReturn(true);
+ expect(consumer.assignment()).andReturn(taskId0Partitions);
+ expect(standby.running()).andReturn(Collections.<StandbyTask>emptySet());
consumer.resume(taskId0Partitions);
- EasyMock.expectLastCall();
+ expectLastCall();
replay();
taskManager.updateNewAndRestoringTasks();
@@ -462,13 +462,31 @@ public class TaskManagerTest {
mockAssignStandbyPartitions(1L);
replay();
- assertTrue(taskManager.updateNewAndRestoringTasks());
+ taskManager.updateNewAndRestoringTasks();
verify(restoreConsumer);
}
@Test
+ public void shouldReturnTrueWhenActiveAndStandbyTasksAreRunning() {
+ mockAssignStandbyPartitions(1L);
+ expect(standby.allTasksRunning()).andReturn(true);
+ replay();
+
+ assertTrue(taskManager.updateNewAndRestoringTasks());
+ }
+
+ @Test
+ public void shouldReturnFalseWhenOnlyActiveTasksAreRunning() {
+ mockAssignStandbyPartitions(1L);
+ expect(standby.allTasksRunning()).andReturn(false);
+ replay();
+
+ assertFalse(taskManager.updateNewAndRestoringTasks());
+ }
+
+ @Test
public void shouldReturnFalseWhenThereAreStillNonRunningTasks() {
- EasyMock.expect(active.allTasksRunning()).andReturn(false);
+ expect(active.allTasksRunning()).andReturn(false);
replay();
assertFalse(taskManager.updateNewAndRestoringTasks());
@@ -478,7 +496,7 @@ public class TaskManagerTest {
public void shouldSeekToCheckpointedOffsetOnStandbyPartitionsWhenOffsetGreaterThanEqualTo0() {
mockAssignStandbyPartitions(1L);
restoreConsumer.seek(t1p0, 1L);
- EasyMock.expectLastCall();
+ expectLastCall();
replay();
taskManager.updateNewAndRestoringTasks();
@@ -489,7 +507,7 @@ public class TaskManagerTest {
public void shouldSeekToBeginningIfOffsetIsLessThan0() {
mockAssignStandbyPartitions(-1L);
restoreConsumer.seekToBeginning(taskId0Partitions);
- EasyMock.expectLastCall();
+ expectLastCall();
replay();
taskManager.updateNewAndRestoringTasks();
@@ -498,8 +516,8 @@ public class TaskManagerTest {
@Test
public void shouldCommitActiveAndStandbyTasks() {
- EasyMock.expect(active.commit()).andReturn(1);
- EasyMock.expect(standby.commit()).andReturn(2);
+ expect(active.commit()).andReturn(1);
+ expect(standby.commit()).andReturn(2);
replay();
@@ -512,7 +530,7 @@ public class TaskManagerTest {
// upgrade to strict mock to ensure no calls
checkOrder(standby, true);
active.commit();
- EasyMock.expectLastCall().andThrow(new RuntimeException(""));
+ expectLastCall().andThrow(new RuntimeException(""));
replay();
try {
@@ -526,7 +544,7 @@ public class TaskManagerTest {
@Test
public void shouldPropagateExceptionFromStandbyCommit() {
- EasyMock.expect(standby.commit()).andThrow(new RuntimeException(""));
+ expect(standby.commit()).andThrow(new RuntimeException(""));
replay();
try {
@@ -546,8 +564,8 @@ public class TaskManagerTest {
futureDeletedRecords.complete(null);
- EasyMock.expect(active.recordsToDelete()).andReturn(Collections.singletonMap(t1p1, 5L)).times(2);
- EasyMock.expect(adminClient.deleteRecords(recordsToDelete)).andReturn(deleteRecordsResult).times(2);
+ expect(active.recordsToDelete()).andReturn(Collections.singletonMap(t1p1, 5L)).times(2);
+ expect(adminClient.deleteRecords(recordsToDelete)).andReturn(deleteRecordsResult).times(2);
replay();
taskManager.maybePurgeCommitedRecords();
@@ -561,8 +579,8 @@ public class TaskManagerTest {
final Map<TopicPartition, RecordsToDelete> recordsToDelete = Collections.singletonMap(t1p1, RecordsToDelete.beforeOffset(5L));
final DeleteRecordsResult deleteRecordsResult = new DeleteRecordsResult(Collections.singletonMap(t1p1, futureDeletedRecords));
- EasyMock.expect(active.recordsToDelete()).andReturn(Collections.singletonMap(t1p1, 5L)).once();
- EasyMock.expect(adminClient.deleteRecords(recordsToDelete)).andReturn(deleteRecordsResult).once();
+ expect(active.recordsToDelete()).andReturn(Collections.singletonMap(t1p1, 5L)).once();
+ expect(adminClient.deleteRecords(recordsToDelete)).andReturn(deleteRecordsResult).once();
replay();
taskManager.maybePurgeCommitedRecords();
@@ -579,8 +597,8 @@ public class TaskManagerTest {
futureDeletedRecords.completeExceptionally(new Exception("KABOOM!"));
- EasyMock.expect(active.recordsToDelete()).andReturn(Collections.singletonMap(t1p1, 5L)).times(2);
- EasyMock.expect(adminClient.deleteRecords(recordsToDelete)).andReturn(deleteRecordsResult).times(2);
+ expect(active.recordsToDelete()).andReturn(Collections.singletonMap(t1p1, 5L)).times(2);
+ expect(adminClient.deleteRecords(recordsToDelete)).andReturn(deleteRecordsResult).times(2);
replay();
taskManager.maybePurgeCommitedRecords();
@@ -590,7 +608,7 @@ public class TaskManagerTest {
@Test
public void shouldMaybeCommitActiveTasks() {
- EasyMock.expect(active.maybeCommitPerUserRequested()).andReturn(5);
+ expect(active.maybeCommitPerUserRequested()).andReturn(5);
replay();
assertThat(taskManager.maybeCommitActiveTasksPerUserRequested(), equalTo(5));
@@ -599,7 +617,7 @@ public class TaskManagerTest {
@Test
public void shouldProcessActiveTasks() {
- EasyMock.expect(active.process(0L)).andReturn(10);
+ expect(active.process(0L)).andReturn(10);
replay();
assertThat(taskManager.process(0L), equalTo(10));
@@ -608,7 +626,7 @@ public class TaskManagerTest {
@Test
public void shouldPunctuateActiveTasks() {
- EasyMock.expect(active.punctuate()).andReturn(20);
+ expect(active.punctuate()).andReturn(20);
replay();
assertThat(taskManager.punctuate(), equalTo(20));
@@ -617,8 +635,8 @@ public class TaskManagerTest {
@Test
public void shouldNotResumeConsumptionUntilAllStoresRestored() {
- EasyMock.expect(active.allTasksRunning()).andReturn(false);
- final Consumer<byte[], byte[]> consumer = (Consumer<byte[], byte[]>) EasyMock.createStrictMock(Consumer.class);
+ expect(active.allTasksRunning()).andReturn(false);
+ final Consumer<byte[], byte[]> consumer = EasyMock.createStrictMock(Consumer.class);
taskManager.setConsumer(consumer);
EasyMock.replay(active, consumer);
@@ -649,12 +667,12 @@ public class TaskManagerTest {
private void mockAssignStandbyPartitions(final long offset) {
final StandbyTask task = EasyMock.createNiceMock(StandbyTask.class);
- EasyMock.expect(active.allTasksRunning()).andReturn(true);
- EasyMock.expect(standby.running()).andReturn(Collections.singletonList(task));
- EasyMock.expect(task.checkpointedOffsets()).andReturn(Collections.singletonMap(t1p0, offset));
+ expect(active.allTasksRunning()).andReturn(true);
+ expect(standby.running()).andReturn(Collections.singletonList(task));
+ expect(task.checkpointedOffsets()).andReturn(Collections.singletonMap(t1p0, offset));
restoreConsumer.assign(taskId0Partitions);
- EasyMock.expectLastCall();
+ expectLastCall();
EasyMock.replay(task);
}