You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/07/06 03:20:54 UTC
kafka git commit: KAFKA-5167: Release state locks in case of failure
Repository: kafka
Updated Branches:
refs/heads/trunk 3ab0456db -> 70e949d52
KAFKA-5167: Release state locks in case of failure
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Damian Guy <da...@gmail.com>, Guozhang Wang <wa...@gmail.com>
Closes #3449 from mjsax/kafka-5167-streams-task-gets-stuck-after-re-balance-due-to-LockException
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/70e949d5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/70e949d5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/70e949d5
Branch: refs/heads/trunk
Commit: 70e949d522eba72b22c2619c4fde372d0f1a26b3
Parents: 3ab0456
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Wed Jul 5 20:20:51 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Jul 5 20:20:51 2017 -0700
----------------------------------------------------------------------
.../processor/internals/StreamThread.java | 26 ++-
.../processor/internals/StreamThreadTest.java | 213 ++++++++++++++++++-
2 files changed, 231 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/70e949d5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index f81773b..efdbeeb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1147,13 +1147,31 @@ public class StreamThread extends Thread {
@Override
public void apply(final StreamTask task) {
- task.suspend();
+ try {
+ task.suspend();
+ } catch (final Exception e) {
+ try {
+ task.close(false);
+ } catch (final Exception f) {
+ log.error("{} Closing task {} failed: ", logPrefix, task.id, f);
+ }
+ throw e;
+ }
}
}));
for (final StandbyTask task : standbyTasks.values()) {
try {
- task.suspend();
+ try {
+ task.suspend();
+ } catch (final Exception e) {
+ try {
+ task.close(false);
+ } catch (final Exception f) {
+ log.error("{} Closing standby task {} failed: ", logPrefix, task.id, f);
+ }
+ throw e;
+ }
} catch (final RuntimeException e) {
firstException.compareAndSet(null, e);
}
@@ -1257,6 +1275,7 @@ public class StreamThread extends Thread {
}
}
+ // visible for testing
protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
streamsMetrics.taskCreatedSensor.record();
@@ -1343,7 +1362,8 @@ public class StreamThread extends Thread {
taskCreator.retryWithBackoff(newTasks, start);
}
- private StandbyTask createStandbyTask(final TaskId id, final Collection<TopicPartition> partitions) {
+ // visible for testing
+ protected StandbyTask createStandbyTask(final TaskId id, final Collection<TopicPartition> partitions) {
streamsMetrics.taskCreatedSensor.record();
final ProcessorTopology topology = builder.build(id.topicGroupId);
http://git-wip-us.apache.org/repos/asf/kafka/blob/70e949d5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 5a31ccd..a0882cf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -542,9 +542,16 @@ public class StreamThreadTest {
private class MockStreamsPartitionAssignor extends StreamPartitionAssignor {
private final Map<TaskId, Set<TopicPartition>> activeTaskAssignment;
+ private final Map<TaskId, Set<TopicPartition>> standbyTaskAssignment;
MockStreamsPartitionAssignor(final Map<TaskId, Set<TopicPartition>> activeTaskAssignment) {
+ this(activeTaskAssignment, Collections.<TaskId, Set<TopicPartition>>emptyMap());
+ }
+
+ MockStreamsPartitionAssignor(final Map<TaskId, Set<TopicPartition>> activeTaskAssignment,
+ final Map<TaskId, Set<TopicPartition>> standbyTaskAssignment) {
this.activeTaskAssignment = activeTaskAssignment;
+ this.standbyTaskAssignment = standbyTaskAssignment;
}
@Override
@@ -553,6 +560,11 @@ public class StreamThreadTest {
}
@Override
+ Map<TaskId, Set<TopicPartition>> standbyTasks() {
+ return standbyTaskAssignment;
+ }
+
+ @Override
public void close() {}
}
@@ -1249,10 +1261,10 @@ public class StreamThreadTest {
}
});
- StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates();
- Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
+ final StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates();
+ final Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
updatedTopicsField.setAccessible(true);
- Set<String> updatedTopics = (Set<String>) updatedTopicsField.get(subscriptionUpdates);
+ final Set<String> updatedTopics = (Set<String>) updatedTopicsField.get(subscriptionUpdates);
updatedTopics.add(t1.topic());
builder.updateSubscriptions(subscriptionUpdates, null);
@@ -1685,13 +1697,13 @@ public class StreamThreadTest {
final TaskId taskId2 = new TaskId(0, 0);
final TaskId taskId3 = new TaskId(0, 0);
- List<TaskId> activeTasks = Arrays.asList(taskId1);
+ List<TaskId> activeTasks = Utils.mkList(taskId1);
final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
AssignmentInfo info = new AssignmentInfo(activeTasks, standbyTasks, new HashMap<HostInfo, Set<TopicPartition>>());
- topicPartitions.addAll(Arrays.asList(topicPartition1));
+ topicPartitions.addAll(Utils.mkList(topicPartition1));
PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(topicPartitions, info.encode());
partitionAssignor.onAssignment(assignment);
@@ -1726,6 +1738,197 @@ public class StreamThreadTest {
}
+ @Test
+ public void shouldReleaseStateDirLockIfFailureOnTaskSuspend() throws Exception {
+ final TaskId taskId = new TaskId(0, 0);
+
+ final StreamThread thread = setupTest(taskId);
+
+ final StateDirectory testStateDir = new StateDirectory(
+ applicationId,
+ config.getString(StreamsConfig.STATE_DIR_CONFIG),
+ mockTime);
+
+ assertFalse(testStateDir.lock(taskId, 0));
+ try {
+ thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+ fail("Should have thrown exception");
+ } catch (final Exception e) {
+ assertTrue(testStateDir.lock(taskId, 0));
+ } finally {
+ testStateDir.unlock(taskId);
+ }
+ }
+
+ @Test
+ public void shouldReleaseStateDirLockIfFailureOnTaskCloseForSuspendedTask() throws Exception {
+ final TaskId taskId = new TaskId(0, 0);
+
+ final StreamThread thread = setupTest(taskId);
+ thread.start();
+
+ final StateDirectory testStateDir = new StateDirectory(
+ applicationId,
+ config.getString(StreamsConfig.STATE_DIR_CONFIG),
+ mockTime);
+
+ assertFalse(testStateDir.lock(taskId, 0));
+ try {
+ thread.close();
+ thread.join();
+ assertTrue(testStateDir.lock(taskId, 0));
+ } finally {
+ testStateDir.unlock(taskId);
+ }
+ }
+
+ private StreamThread setupTest(final TaskId taskId) {
+ final TopologyBuilder builder = new TopologyBuilder();
+ builder.setApplicationId(applicationId);
+ builder.addSource("source", "topic");
+
+ final MockClientSupplier clientSupplier = new MockClientSupplier();
+ final StateDirectory stateDirectory = new StateDirectory(
+ applicationId,
+ config.getString(StreamsConfig.STATE_DIR_CONFIG),
+ mockTime);
+
+ final TestStreamTask testStreamTask = new TestStreamTask(taskId,
+ applicationId,
+ Utils.mkSet(new TopicPartition("topic", 0)),
+ builder.build(0),
+ clientSupplier.consumer,
+ clientSupplier.getProducer(new HashMap<String, Object>()),
+ clientSupplier.restoreConsumer,
+ config,
+ new MockStreamsMetrics(new Metrics()),
+ stateDirectory) {
+
+ @Override
+ public void suspend() {
+ throw new RuntimeException("KABOOM!!!");
+ }
+ };
+
+ final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId,
+ clientId, processId, new Metrics(), new MockTime(),
+ new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) {
+ @Override
+ protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
+ return testStreamTask;
+ }
+ };
+
+ final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+ activeTasks.put(testStreamTask.id, testStreamTask.partitions);
+ thread.setPartitionAssignor(new MockStreamsPartitionAssignor(activeTasks));
+ thread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions);
+
+ return thread;
+ }
+
+ @Test
+ public void shouldReleaseStateDirLockIfFailureOnStandbyTaskSuspend() throws Exception {
+ final TaskId taskId = new TaskId(0, 0);
+
+ final StreamThread thread = setupStandbyTest(taskId);
+
+ final StateDirectory testStateDir = new StateDirectory(applicationId,
+ config.getString(StreamsConfig.STATE_DIR_CONFIG),
+ mockTime);
+
+ assertFalse(testStateDir.lock(taskId, 0));
+ try {
+ thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+ fail("Should have thrown exception");
+ } catch (final Exception e) {
+ assertTrue(testStateDir.lock(taskId, 0));
+ } finally {
+ testStateDir.unlock(taskId);
+ }
+ }
+
+ @Test
+ public void shouldReleaseStateDirLockIfFailureOnStandbyTaskCloseForUnassignedSuspendedStandbyTask() throws Exception {
+ final TaskId taskId = new TaskId(0, 0);
+
+ final StreamThread thread = setupStandbyTest(taskId);
+ thread.start();
+
+ final StateDirectory testStateDir = new StateDirectory(applicationId,
+ config.getString(StreamsConfig.STATE_DIR_CONFIG),
+ mockTime);
+
+ assertFalse(testStateDir.lock(taskId, 0));
+ try {
+ thread.close();
+ thread.join();
+ assertTrue(testStateDir.lock(taskId, 0));
+ } finally {
+ testStateDir.unlock(taskId);
+ }
+ }
+
+ private StreamThread setupStandbyTest(final TaskId taskId) {
+ final String storeName = "store";
+ final String changelogTopic = applicationId + "-" + storeName + "-changelog";
+
+ final KStreamBuilder builder = new KStreamBuilder();
+ builder.setApplicationId(applicationId);
+ builder.stream("topic1").groupByKey().count(storeName);
+
+ final MockClientSupplier clientSupplier = new MockClientSupplier();
+ clientSupplier.restoreConsumer.updatePartitions(changelogTopic,
+ Collections.singletonList(new PartitionInfo(changelogTopic, 0, null, null, null)));
+ clientSupplier.restoreConsumer.updateBeginningOffsets(new HashMap<TopicPartition, Long>() {
+ {
+ put(new TopicPartition(changelogTopic, 0), 0L);
+ }
+ });
+ clientSupplier.restoreConsumer.updateEndOffsets(new HashMap<TopicPartition, Long>() {
+ {
+ put(new TopicPartition(changelogTopic, 0), 0L);
+ }
+ });
+
+ final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId,
+ clientId, processId, new Metrics(), new MockTime(),
+ new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) {
+
+ @Override
+ protected StandbyTask createStandbyTask(final TaskId id, final Collection<TopicPartition> partitions) {
+ return new StandbyTask(
+ taskId,
+ applicationId,
+ partitions,
+ builder.build(0),
+ clientSupplier.consumer,
+ new StoreChangelogReader(getName(), clientSupplier.restoreConsumer, mockTime, 1000),
+ StreamThreadTest.this.config,
+ new StreamsMetricsImpl(new Metrics(), "groupName", Collections.<String, String>emptyMap()),
+ stateDirectory) {
+
+ @Override
+ public void suspend() {
+ throw new RuntimeException("KABOOM!!!");
+ }
+
+ @Override
+ public void commit() {
+ throw new RuntimeException("KABOOM!!!");
+ }
+ };
+ }
+ };
+
+ final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
+ standbyTasks.put(taskId, Collections.singleton(new TopicPartition("topic", 0)));
+ thread.setPartitionAssignor(new MockStreamsPartitionAssignor(Collections.<TaskId, Set<TopicPartition>>emptyMap(), standbyTasks));
+ thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptySet());
+
+ return thread;
+ }
+
private void initPartitionGrouper(final StreamsConfig config,
final StreamThread thread,
final MockClientSupplier clientSupplier) {