You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/07/06 03:20:54 UTC

kafka git commit: KAFKA-5167: Release state locks in case of failure

Repository: kafka
Updated Branches:
  refs/heads/trunk 3ab0456db -> 70e949d52


KAFKA-5167: Release state locks in case of failure

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Damian Guy <da...@gmail.com>, Guozhang Wang <wa...@gmail.com>

Closes #3449 from mjsax/kafka-5167-streams-task-gets-stuck-after-re-balance-due-to-LockException


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/70e949d5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/70e949d5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/70e949d5

Branch: refs/heads/trunk
Commit: 70e949d522eba72b22c2619c4fde372d0f1a26b3
Parents: 3ab0456
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Wed Jul 5 20:20:51 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Jul 5 20:20:51 2017 -0700

----------------------------------------------------------------------
 .../processor/internals/StreamThread.java       |  26 ++-
 .../processor/internals/StreamThreadTest.java   | 213 ++++++++++++++++++-
 2 files changed, 231 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/70e949d5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index f81773b..efdbeeb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1147,13 +1147,31 @@ public class StreamThread extends Thread {
 
             @Override
             public void apply(final StreamTask task) {
-                task.suspend();
+                try {
+                    task.suspend();
+                } catch (final Exception e) {
+                    try {
+                        task.close(false);
+                    } catch (final Exception f) {
+                        log.error("{} Closing task {} failed: ", logPrefix, task.id, f);
+                    }
+                    throw e;
+                }
             }
         }));
 
         for (final StandbyTask task : standbyTasks.values()) {
             try {
-                task.suspend();
+                try {
+                    task.suspend();
+                } catch (final Exception e) {
+                    try {
+                        task.close(false);
+                    } catch (final Exception f) {
+                        log.error("{} Closing standby task {} failed: ", logPrefix, task.id, f);
+                    }
+                    throw e;
+                }
             } catch (final RuntimeException e) {
                 firstException.compareAndSet(null, e);
             }
@@ -1257,6 +1275,7 @@ public class StreamThread extends Thread {
         }
     }
 
+    // visible for testing
     protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
         streamsMetrics.taskCreatedSensor.record();
 
@@ -1343,7 +1362,8 @@ public class StreamThread extends Thread {
         taskCreator.retryWithBackoff(newTasks, start);
     }
 
-    private StandbyTask createStandbyTask(final TaskId id, final Collection<TopicPartition> partitions) {
+    // visible for testing
+    protected StandbyTask createStandbyTask(final TaskId id, final Collection<TopicPartition> partitions) {
         streamsMetrics.taskCreatedSensor.record();
 
         final ProcessorTopology topology = builder.build(id.topicGroupId);

http://git-wip-us.apache.org/repos/asf/kafka/blob/70e949d5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 5a31ccd..a0882cf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -542,9 +542,16 @@ public class StreamThreadTest {
     private class MockStreamsPartitionAssignor extends StreamPartitionAssignor {
 
         private final Map<TaskId, Set<TopicPartition>> activeTaskAssignment;
+        private final Map<TaskId, Set<TopicPartition>> standbyTaskAssignment;
 
         MockStreamsPartitionAssignor(final Map<TaskId, Set<TopicPartition>> activeTaskAssignment) {
+            this(activeTaskAssignment, Collections.<TaskId, Set<TopicPartition>>emptyMap());
+        }
+
+        MockStreamsPartitionAssignor(final Map<TaskId, Set<TopicPartition>> activeTaskAssignment,
+                                     final Map<TaskId, Set<TopicPartition>> standbyTaskAssignment) {
             this.activeTaskAssignment = activeTaskAssignment;
+            this.standbyTaskAssignment = standbyTaskAssignment;
         }
 
         @Override
@@ -553,6 +560,11 @@ public class StreamThreadTest {
         }
 
         @Override
+        Map<TaskId, Set<TopicPartition>> standbyTasks() {
+            return standbyTaskAssignment;
+        }
+
+        @Override
         public void close() {}
     }
 
@@ -1249,10 +1261,10 @@ public class StreamThreadTest {
             }
         });
 
-        StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates();
-        Field updatedTopicsField  = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
+        final StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates();
+        final Field updatedTopicsField  = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
         updatedTopicsField.setAccessible(true);
-        Set<String> updatedTopics = (Set<String>) updatedTopicsField.get(subscriptionUpdates);
+        final Set<String> updatedTopics = (Set<String>) updatedTopicsField.get(subscriptionUpdates);
         updatedTopics.add(t1.topic());
         builder.updateSubscriptions(subscriptionUpdates, null);
 
@@ -1685,13 +1697,13 @@ public class StreamThreadTest {
         final TaskId taskId2 = new TaskId(0, 0);
         final TaskId taskId3 = new TaskId(0, 0);
 
-        List<TaskId> activeTasks = Arrays.asList(taskId1);
+        List<TaskId> activeTasks = Utils.mkList(taskId1);
 
         final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
 
         AssignmentInfo info = new AssignmentInfo(activeTasks, standbyTasks, new HashMap<HostInfo, Set<TopicPartition>>());
 
-        topicPartitions.addAll(Arrays.asList(topicPartition1));
+        topicPartitions.addAll(Utils.mkList(topicPartition1));
         PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(topicPartitions, info.encode());
         partitionAssignor.onAssignment(assignment);
 
@@ -1726,6 +1738,197 @@ public class StreamThreadTest {
 
     }
 
+    @Test
+    public void shouldReleaseStateDirLockIfFailureOnTaskSuspend() throws Exception {
+        final TaskId taskId = new TaskId(0, 0);
+
+        final StreamThread thread = setupTest(taskId);
+
+        final StateDirectory testStateDir = new StateDirectory(
+            applicationId,
+            config.getString(StreamsConfig.STATE_DIR_CONFIG),
+            mockTime);
+
+        assertFalse(testStateDir.lock(taskId, 0));
+        try {
+            thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+            fail("Should have thrown exception");
+        } catch (final Exception e) {
+            assertTrue(testStateDir.lock(taskId, 0));
+        } finally {
+            testStateDir.unlock(taskId);
+        }
+    }
+
+    @Test
+    public void shouldReleaseStateDirLockIfFailureOnTaskCloseForSuspendedTask() throws Exception {
+        final TaskId taskId = new TaskId(0, 0);
+
+        final StreamThread thread = setupTest(taskId);
+        thread.start();
+
+        final StateDirectory testStateDir = new StateDirectory(
+            applicationId,
+            config.getString(StreamsConfig.STATE_DIR_CONFIG),
+            mockTime);
+
+        assertFalse(testStateDir.lock(taskId, 0));
+        try {
+            thread.close();
+            thread.join();
+            assertTrue(testStateDir.lock(taskId, 0));
+        } finally {
+            testStateDir.unlock(taskId);
+        }
+    }
+
+    private StreamThread setupTest(final TaskId taskId) {
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.setApplicationId(applicationId);
+        builder.addSource("source", "topic");
+
+        final MockClientSupplier clientSupplier = new MockClientSupplier();
+        final StateDirectory stateDirectory = new StateDirectory(
+            applicationId,
+            config.getString(StreamsConfig.STATE_DIR_CONFIG),
+            mockTime);
+
+        final TestStreamTask testStreamTask = new TestStreamTask(taskId,
+            applicationId,
+            Utils.mkSet(new TopicPartition("topic", 0)),
+            builder.build(0),
+            clientSupplier.consumer,
+            clientSupplier.getProducer(new HashMap<String, Object>()),
+            clientSupplier.restoreConsumer,
+            config,
+            new MockStreamsMetrics(new Metrics()),
+            stateDirectory) {
+
+            @Override
+            public void suspend() {
+                throw new RuntimeException("KABOOM!!!");
+            }
+        };
+
+        final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId,
+            clientId, processId, new Metrics(), new MockTime(),
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) {
+            @Override
+            protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
+                return testStreamTask;
+            }
+        };
+
+        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+        activeTasks.put(testStreamTask.id, testStreamTask.partitions);
+        thread.setPartitionAssignor(new MockStreamsPartitionAssignor(activeTasks));
+        thread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions);
+
+        return thread;
+    }
+
+    @Test
+    public void shouldReleaseStateDirLockIfFailureOnStandbyTaskSuspend() throws Exception {
+        final TaskId taskId = new TaskId(0, 0);
+
+        final StreamThread thread = setupStandbyTest(taskId);
+
+        final StateDirectory testStateDir = new StateDirectory(applicationId,
+            config.getString(StreamsConfig.STATE_DIR_CONFIG),
+            mockTime);
+
+        assertFalse(testStateDir.lock(taskId, 0));
+        try {
+            thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+            fail("Should have thrown exception");
+        } catch (final Exception e) {
+            assertTrue(testStateDir.lock(taskId, 0));
+        } finally {
+            testStateDir.unlock(taskId);
+        }
+    }
+
+    @Test
+    public void shouldReleaseStateDirLockIfFailureOnStandbyTaskCloseForUnassignedSuspendedStandbyTask() throws Exception {
+        final TaskId taskId = new TaskId(0, 0);
+
+        final StreamThread thread = setupStandbyTest(taskId);
+        thread.start();
+
+        final StateDirectory testStateDir = new StateDirectory(applicationId,
+            config.getString(StreamsConfig.STATE_DIR_CONFIG),
+            mockTime);
+
+        assertFalse(testStateDir.lock(taskId, 0));
+        try {
+            thread.close();
+            thread.join();
+            assertTrue(testStateDir.lock(taskId, 0));
+        } finally {
+            testStateDir.unlock(taskId);
+        }
+    }
+
+    private StreamThread setupStandbyTest(final TaskId taskId) {
+        final String storeName = "store";
+        final String changelogTopic = applicationId + "-" + storeName + "-changelog";
+
+        final KStreamBuilder builder = new KStreamBuilder();
+        builder.setApplicationId(applicationId);
+        builder.stream("topic1").groupByKey().count(storeName);
+
+        final MockClientSupplier clientSupplier = new MockClientSupplier();
+        clientSupplier.restoreConsumer.updatePartitions(changelogTopic,
+            Collections.singletonList(new PartitionInfo(changelogTopic, 0, null, null, null)));
+        clientSupplier.restoreConsumer.updateBeginningOffsets(new HashMap<TopicPartition, Long>() {
+            {
+                put(new TopicPartition(changelogTopic, 0), 0L);
+            }
+        });
+        clientSupplier.restoreConsumer.updateEndOffsets(new HashMap<TopicPartition, Long>() {
+            {
+                put(new TopicPartition(changelogTopic, 0), 0L);
+            }
+        });
+
+        final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId,
+            clientId, processId, new Metrics(), new MockTime(),
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) {
+
+            @Override
+            protected StandbyTask createStandbyTask(final TaskId id, final Collection<TopicPartition> partitions) {
+                return new StandbyTask(
+                    taskId,
+                    applicationId,
+                    partitions,
+                    builder.build(0),
+                    clientSupplier.consumer,
+                    new StoreChangelogReader(getName(), clientSupplier.restoreConsumer, mockTime, 1000),
+                    StreamThreadTest.this.config,
+                    new StreamsMetricsImpl(new Metrics(), "groupName", Collections.<String, String>emptyMap()),
+                    stateDirectory) {
+
+                    @Override
+                    public void suspend() {
+                        throw new RuntimeException("KABOOM!!!");
+                    }
+
+                    @Override
+                    public void commit() {
+                        throw new RuntimeException("KABOOM!!!");
+                    }
+                };
+            }
+        };
+
+        final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
+        standbyTasks.put(taskId, Collections.singleton(new TopicPartition("topic", 0)));
+        thread.setPartitionAssignor(new MockStreamsPartitionAssignor(Collections.<TaskId, Set<TopicPartition>>emptyMap(), standbyTasks));
+        thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptySet());
+
+        return thread;
+    }
+
     private void initPartitionGrouper(final StreamsConfig config,
                                       final StreamThread thread,
                                       final MockClientSupplier clientSupplier) {