You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/06/25 01:58:30 UTC
[kafka] branch trunk updated: KAFKA-10198: guard against recycling
dirty state (#8924)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3348fc4 KAFKA-10198: guard against recycling dirty state (#8924)
3348fc4 is described below
commit 3348fc49d8824155e737b866f633e14684da5fe9
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Wed Jun 24 18:57:38 2020 -0700
KAFKA-10198: guard against recycling dirty state (#8924)
We just needed to add the check in StreamTask#closeClean to closeAndRecycleState as well. I also renamed closeAndRecycleState to closeCleanAndRecycleState to drive this point home: it needs to be clean.
This should be cherry-picked back to the 2.6 branch
Reviewers: Matthias J. Sax <ma...@confluent.io>, John Roesler <jo...@confluent.io>, Guozhang Wang <wa...@gmail.com>,
---
.../processor/internals/ActiveTaskCreator.java | 2 +-
.../streams/processor/internals/StandbyTask.java | 2 +-
.../processor/internals/StandbyTaskCreator.java | 2 +-
.../streams/processor/internals/StreamTask.java | 21 +++++++-----
.../kafka/streams/processor/internals/Task.java | 2 +-
.../processor/internals/StandbyTaskTest.java | 6 ++--
.../processor/internals/StreamTaskTest.java | 39 ++++++++++++++++++----
.../processor/internals/TaskManagerTest.java | 2 +-
8 files changed, 54 insertions(+), 22 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index 0a1f47e..012ff20 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -184,7 +184,7 @@ class ActiveTaskCreator {
final ProcessorStateManager stateManager = standbyTask.stateMgr;
final LogContext logContext = getLogContext(standbyTask.id);
- standbyTask.closeAndRecycleState();
+ standbyTask.closeCleanAndRecycleState();
stateManager.transitionTaskType(TaskType.ACTIVE, logContext);
return createActiveTask(
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index ffd09f1..1aa68bc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -189,7 +189,7 @@ public class StandbyTask extends AbstractTask implements Task {
}
@Override
- public void closeAndRecycleState() {
+ public void closeCleanAndRecycleState() {
streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());
if (state() == State.SUSPENDED) {
stateMgr.recycle();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
index 443db8e..b5f2b74 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
@@ -112,7 +112,7 @@ class StandbyTaskCreator {
final InternalProcessorContext context = streamTask.processorContext();
final ProcessorStateManager stateManager = streamTask.stateMgr;
- streamTask.closeAndRecycleState();
+ streamTask.closeCleanAndRecycleState();
stateManager.transitionTaskType(TaskType.STANDBY, getLogContext(streamTask.id()));
return createStandbyTask(
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 6e8bf40..4b27436 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -463,6 +463,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
@Override
public void closeClean() {
+ validateClean();
streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());
close(true);
log.info("Closed clean");
@@ -482,7 +483,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
}
@Override
- public void closeAndRecycleState() {
+ public void closeCleanAndRecycleState() {
+ validateClean();
streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());
switch (state()) {
case SUSPENDED:
@@ -515,17 +517,20 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
stateMgr.checkpoint(checkpointableOffsets());
}
- /**
- * You must commit a task and checkpoint the state manager before closing as this will release the state dir lock
- */
- private void close(final boolean clean) {
- if (clean && commitNeeded) {
- // It may be that we failed to commit a task during handleRevocation, but "forgot" this and tried to
- // closeClean in handleAssignment. We should throw if we detect this to force the TaskManager to closeDirty
+ private void validateClean() {
+ // It may be that we failed to commit a task during handleRevocation, but "forgot" this and tried to
+ // closeClean in handleAssignment. We should throw if we detect this to force the TaskManager to closeDirty
+ if (commitNeeded) {
log.debug("Tried to close clean but there was pending uncommitted data, this means we failed to"
+ " commit and should close as dirty instead");
throw new TaskMigratedException("Tried to close dirty task as clean");
}
+ }
+
+ /**
+ * You must commit a task and checkpoint the state manager before closing as this will release the state dir lock
+ */
+ private void close(final boolean clean) {
switch (state()) {
case SUSPENDED:
// first close state manager (which is idempotent) then close the record collector
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index 0200870..103c231 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -153,7 +153,7 @@ public interface Task {
/**
* Attempt a clean close but do not close the underlying state
*/
- void closeAndRecycleState();
+ void closeCleanAndRecycleState();
/**
* Revive a closed task to a created one; should never throw an exception
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 82f33c4..f98d630 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -502,13 +502,13 @@ public class StandbyTaskTest {
EasyMock.replay(stateManager);
task = createStandbyTask();
- assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // CREATED
+ assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // CREATED
task.initializeIfNeeded();
- assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // RUNNING
+ assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // RUNNING
task.suspend();
- task.closeAndRecycleState(); // SUSPENDED
+ task.closeCleanAndRecycleState(); // SUSPENDED
// Currently, there are no metrics registered for standby tasks.
// This is a regression test so that, if we add some, we will be sure to deregister them.
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index c6ffe74..9607470 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -44,6 +44,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
@@ -1752,7 +1753,7 @@ public class StreamTaskTest {
}
@Test
- public void shouldUnregisterMetricsInCloseAndRecycle() {
+ public void shouldUnregisterMetricsInCloseCleanAndRecycleState() {
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
EasyMock.replay(stateManager, recordCollector);
@@ -1761,7 +1762,7 @@ public class StreamTaskTest {
task.suspend();
assertThat(getTaskMetrics(), not(empty()));
- task.closeAndRecycleState();
+ task.closeCleanAndRecycleState();
assertThat(getTaskMetrics(), empty());
}
@@ -1799,22 +1800,48 @@ public class StreamTaskTest {
}
@Test
+ public void shouldThrowIfCleanClosingDirtyTask() {
+ task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST);
+ task.initializeIfNeeded();
+ task.completeRestoration();
+
+ task.addRecords(partition1, singletonList(getConsumerRecord(partition1, 0)));
+ task.process(0L);
+ assertTrue(task.commitNeeded());
+
+ assertThrows(TaskMigratedException.class, () -> task.closeClean());
+ }
+
+ @Test
+ public void shouldThrowIfRecyclingDirtyTask() {
+ task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST);
+ task.initializeIfNeeded();
+ task.completeRestoration();
+
+ task.addRecords(partition1, singletonList(getConsumerRecord(partition1, 0)));
+ task.process(0L);
+ assertTrue(task.commitNeeded());
+
+ assertThrows(TaskMigratedException.class, () -> task.closeCleanAndRecycleState());
+ }
+
+ @Test
public void shouldOnlyRecycleSuspendedTasks() {
stateManager.recycle();
recordCollector.closeClean();
EasyMock.replay(stateManager, recordCollector);
task = createStatefulTask(createConfig(false, "100"), true);
- assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // CREATED
+ assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // CREATED
task.initializeIfNeeded();
- assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // RESTORING
+ assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // RESTORING
task.completeRestoration();
- assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // RUNNING
+ assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // RUNNING
task.suspend();
- task.closeAndRecycleState(); // SUSPENDED
+ task.closeCleanAndRecycleState(); // SUSPENDED
EasyMock.verify(stateManager, recordCollector);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index fcfbb1f..23166d1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -2723,7 +2723,7 @@ public class TaskManagerTest {
}
@Override
- public void closeAndRecycleState() {
+ public void closeCleanAndRecycleState() {
transitionTo(State.CLOSED);
}