You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/05/01 16:01:00 UTC

[jira] [Commented] (KAFKA-5896) Kafka Connect task threads never interrupted

    [ https://issues.apache.org/jira/browse/KAFKA-5896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16459799#comment-16459799 ] 

ASF GitHub Bot commented on KAFKA-5896:
---------------------------------------

56quarters closed pull request #3876: KAFKA-5896: Force Connect tasks to stop via thread interruption
URL: https://github.com/apache/kafka/pull/3876
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index c6e2e173834..b9f96128f6e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -55,6 +55,7 @@
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 
 /**
@@ -83,6 +84,9 @@
 
     private final ConcurrentMap<String, WorkerConnector> connectors = new ConcurrentHashMap<>();
     private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new ConcurrentHashMap<>();
+    private final ConcurrentMap<ConnectorTaskId, Future<?>> futures = new ConcurrentHashMap<>();
+    private final Object taskAndFutureLock = new Object();
+
     private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
 
     public Worker(
@@ -414,11 +418,14 @@ public boolean startTask(
             return false;
         }
 
-        WorkerTask existing = tasks.putIfAbsent(id, workerTask);
-        if (existing != null)
-            throw new ConnectException("Task already exists in this worker: " + id);
+        synchronized (taskAndFutureLock) {
+            WorkerTask existing = tasks.putIfAbsent(id, workerTask);
+            if (existing != null)
+                throw new ConnectException("Task already exists in this worker: " + id);
+
+            futures.put(id, executor.submit(workerTask));
+        }
 
-        executor.submit(workerTask);
         if (workerTask instanceof WorkerSourceTask) {
             sourceTaskOffsetCommitter.schedule(id, (WorkerSourceTask) workerTask);
         }
@@ -483,18 +490,41 @@ private void stopTasks(Collection<ConnectorTaskId> ids) {
     }
 
     private void awaitStopTask(ConnectorTaskId taskId, long timeout) {
-        WorkerTask task = tasks.remove(taskId);
+        WorkerTask task;
+        Future<?> future;
+
+        synchronized (taskAndFutureLock) {
+            task = tasks.remove(taskId);
+            future = futures.remove(taskId);
+        }
+
         if (task == null) {
             log.warn("Ignoring await stop request for non-present task {}", taskId);
             return;
         }
 
         if (!task.awaitStop(timeout)) {
-            log.error("Graceful stop of task {} failed.", task.id());
+            log.error("Graceful stop of task {} failed. Cancelling and forcibly interrupting.", task.id());
             task.cancel();
+
+            if (future == null) {
+                log.warn("No associated Future found for task {}", taskId);
+                return;
+            }
+
+            // Interrupt the thread that the task is running in since it hasn't stopped on its
+            // own by this point. This prevents scenarios where a task runs indefinitely because
+            // it's blocked on something (lock, network I/O, etc.).
+            future.cancel(true);
         }
     }
 
+    // Visible for testing
+    boolean isTaskFutureRunning(ConnectorTaskId taskId) {
+        final Future<?> future = futures.get(taskId);
+        return future != null && !future.isDone();
+    }
+
     private void awaitStopTasks(Collection<ConnectorTaskId> ids) {
         long now = time.milliseconds();
         long deadline = now + config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 80c65df7ff4..361cec77a94 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -34,6 +34,8 @@
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
@@ -57,14 +59,18 @@
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 @RunWith(PowerMockRunner.class)
@@ -782,6 +788,71 @@ public void testConverterOverrides() throws Exception {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testStopHungConnectorTask() throws Exception {
+        expectConverters(true);
+        expectStartStorage();
+
+        EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
+
+        Map<String, String> taskProps = new HashMap<>();
+        taskProps.put(TaskConfig.TASK_CLASS_CONFIG, TestHangSinkTask.class.getName());
+        taskProps.put(SinkTask.TOPICS_CONFIG, "foo,bar");
+
+        Map<String, String> connectorProps = new HashMap<>();
+        connectorProps.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
+        connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerHangTestConnector.class.getName());
+        connectorProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
+
+        CountDownLatch started = new CountDownLatch(1);
+        SinkTask task = new TestHangSinkTask(started);
+        EasyMock.expect(plugins.newTask(TestHangSinkTask.class)).andReturn(task);
+
+        EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader);
+        EasyMock.expect(delegatingLoader.connectorLoader(WorkerHangTestConnector.class.getName()))
+                .andReturn(pluginLoader);
+
+        EasyMock.expect(Plugins.compareAndSwapLoaders(pluginLoader)).andReturn(delegatingLoader)
+                .times(3);
+
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader)
+                .times(3);
+
+        taskStatusListener.onStartup(TASK_ID);
+        EasyMock.expectLastCall();
+
+        taskStatusListener.onShutdown(TASK_ID);
+        EasyMock.expectLastCall();
+
+        expectStopStorage();
+
+        PowerMock.replayAll();
+
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
+        worker.start();
+
+        assertEquals(Collections.emptySet(), worker.taskIds());
+        assertFalse(worker.isTaskFutureRunning(TASK_ID));
+
+        // Start running the task in an executor and attempt to wait a little while until
+        // it starts to block indefinitely. This ensures that our call to .stopAndAwaitTask()
+        // will have to force the task to stop by interrupting the thread.
+        worker.startTask(TASK_ID, connectorProps, taskProps, taskStatusListener, TargetState.STARTED);
+        started.await(10, TimeUnit.SECONDS);
+
+        assertEquals(Collections.singleton(TASK_ID), worker.taskIds());
+        assertTrue(worker.isTaskFutureRunning(TASK_ID));
+
+        worker.stopAndAwaitTask(TASK_ID);
+
+        assertEquals(Collections.emptySet(), worker.taskIds());
+        assertFalse(worker.isTaskFutureRunning(TASK_ID));
+
+        worker.stop();
+
+        PowerMock.verifyAll();
+    }
+
     private void assertStatistics(Worker worker, int connectors, int tasks) {
         MetricGroup workerMetrics = worker.workerMetricsGroup().metricGroup();
         assertEquals(connectors, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), workerMetrics, "connector-count"), 0.0001d);
@@ -968,4 +1039,68 @@ public SchemaAndValue toConnectData(String topic, byte[] value) {
             return null;
         }
     }
+
+    static class WorkerHangTestConnector extends SinkConnector {
+
+        @Override
+        public String version() {
+            return "1.0";
+        }
+
+        @Override
+        public void start(Map<String, String> props) {
+        }
+
+        @Override
+        public Class<? extends Task> taskClass() {
+            return TestHangSinkTask.class;
+        }
+
+        @Override
+        public List<Map<String, String>> taskConfigs(int maxTasks) {
+            return null;
+        }
+
+        @Override
+        public void stop() {
+        }
+
+        @Override
+        public ConfigDef config() {
+            return new ConfigDef();
+        }
+    }
+
+    static class TestHangSinkTask extends SinkTask {
+        private CountDownLatch hung;
+        private CountDownLatch started;
+
+        TestHangSinkTask(CountDownLatch started) {
+            this.started = started;
+        }
+
+        @Override
+        public String version() {
+            return "1.0";
+        }
+
+        @Override
+        public void start(Map<String, String> props) {
+            this.hung = new CountDownLatch(1);
+        }
+
+        @Override
+        public void put(Collection<SinkRecord> records) {
+            try {
+                started.countDown();
+                hung.await();
+            } catch (InterruptedException e) {
+                // nothing
+            }
+        }
+
+        @Override
+        public void stop() {
+        }
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Kafka Connect task threads never interrupted
> --------------------------------------------
>
>                 Key: KAFKA-5896
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5896
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>            Reporter: Nick Pillitteri
>            Assignee: Nick Pillitteri
>            Priority: Minor
>
> h2. Problem
> Kafka Connect tasks associated with connectors are run in their own threads. When tasks are stopped or restarted, a flag is set - {{stopping}} - to indicate the task should stop processing records. However, if the thread the task is running in is blocked (waiting for a lock or performing I/O) it's possible the task will never stop.
> I've created a connector specifically to demonstrate this issue (along with some more detailed instructions for reproducing the issue): https://github.com/smarter-travel-media/hang-connector
> I believe this is an issue because it means that a single badly behaved connector (any connector that does I/O without timeouts) can cause the Kafka Connect worker to get into a state where the only solution is to restart the JVM.
> I think, but couldn't reproduce, that this is the cause of this problem on Stack Overflow: https://stackoverflow.com/questions/43802156/inconsistent-connector-state-connectexception-task-already-exists-in-this-work
> h2. Expected Result
> I would expect the Worker to eventually interrupt the thread that the task is running in. In the past across various other libraries, this is what I've seen done when a thread needs to be forcibly stopped.
> h2. Actual Result
> In actuality, the Worker sets a {{stopping}} flag and lets the thread run indefinitely. It uses a timeout while waiting for the task to stop but after this timeout has expired it simply sets a {{cancelled}} flag. This means that every time a task is restarted, a new thread running the task will be created. Thus a task may end up with multiple instances all running in their own threads when there's only supposed to be a single thread.
> h2. Steps to Reproduce
> The problem can be replicated by using the connector available here: https://github.com/smarter-travel-media/hang-connector
> Apologies for how involved the steps are.
> I've created a patch that forcibly interrupts threads after they fail to gracefully shutdown here: https://github.com/smarter-travel-media/kafka/commit/295c747a9fd82ee8b30556c89c31e0bfcce5a2c5
> I've confirmed that this fixes the issue. I can add some unit tests and submit a PR if people agree that this is a bug and interrupting threads is the right fix.
> Thanks!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)