You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/06/11 03:39:35 UTC

[kafka] 01/02: KAFKA-9066: Retain metrics for failed tasks (#8502)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit eb58b61603e8bdb0ccf94af3f956a09b5591d5b3
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Wed Jun 10 20:03:25 2020 -0700

    KAFKA-9066: Retain metrics for failed tasks (#8502)
    
    Author: Chris Egerton <ch...@confluent.io>
    Reviewers: Nigel Liang <ni...@nigelliang.com>, Randall Hauch <rh...@gmail.com>
---
 .../org/apache/kafka/connect/runtime/Worker.java   |  7 +++++-
 .../kafka/connect/runtime/WorkerSinkTask.java      |  8 +++++--
 .../kafka/connect/runtime/WorkerSourceTask.java    |  8 +++++--
 .../apache/kafka/connect/runtime/WorkerTask.java   | 27 ++++++++--------------
 .../kafka/connect/runtime/WorkerTaskTest.java      |  9 --------
 .../apache/kafka/connect/runtime/WorkerTest.java   | 12 ++++++++++
 .../runtime/WorkerWithTopicCreationTest.java       | 12 ++++++++++
 7 files changed, 52 insertions(+), 31 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 26b0444..67a27e1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -789,13 +789,18 @@ public class Worker {
                 return;
             }
 
-            connectorStatusMetricsGroup.recordTaskRemoved(taskId);
             if (!task.awaitStop(timeout)) {
                 log.error("Graceful stop of task {} failed.", task.id());
                 task.cancel();
             } else {
                 log.debug("Graceful stop of task {} succeeded.", task.id());
             }
+
+            try {
+                task.removeMetrics();
+            } finally {
+                connectorStatusMetricsGroup.recordTaskRemoved(taskId);
+            }
         }
     }
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index c22ce4a..50efffc 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -174,8 +174,12 @@ class WorkerSinkTask extends WorkerTask {
     }
 
     @Override
-    protected void releaseResources() {
-        sinkTaskMetricsGroup.close();
+    public void removeMetrics() {
+        try {
+            sinkTaskMetricsGroup.close();
+        } finally {
+            super.removeMetrics();
+        }
     }
 
     @Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index e44d93a..1febd7f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -188,8 +188,12 @@ class WorkerSourceTask extends WorkerTask {
     }
 
     @Override
-    protected void releaseResources() {
-        sourceTaskMetricsGroup.close();
+    public void removeMetrics() {
+        try {
+            sourceTaskMetricsGroup.close();
+        } finally {
+            super.removeMetrics();
+        }
     }
 
     @Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index b0a6a0c..11b0746 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -144,16 +144,17 @@ abstract class WorkerTask implements Runnable {
         }
     }
 
+    /**
+     * Remove all metrics published by this task.
+     */
+    public void removeMetrics() {
+        taskMetricsGroup.close();
+    }
+
     protected abstract void execute();
 
     protected abstract void close();
 
-    /**
-     * Method called when this worker task has been completely closed, and when the subclass should clean up
-     * all resources.
-     */
-    protected abstract void releaseResources();
-
     protected boolean isStopping() {
         return stopping;
     }
@@ -239,17 +240,9 @@ abstract class WorkerTask implements Runnable {
                 if (t instanceof Error)
                     throw (Error) t;
             } finally {
-                try {
-                    Thread.currentThread().setName(savedName);
-                    Plugins.compareAndSwapLoaders(savedLoader);
-                    shutdownLatch.countDown();
-                } finally {
-                    try {
-                        releaseResources();
-                    } finally {
-                        taskMetricsGroup.close();
-                    }
-                }
+                Thread.currentThread().setName(savedName);
+                Plugins.compareAndSwapLoaders(savedLoader);
+                shutdownLatch.countDown();
             }
         }
     }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
index 44c45d5..c26a88c 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
@@ -109,9 +109,6 @@ public class WorkerTaskTest {
         workerTask.close();
         expectLastCall();
 
-        workerTask.releaseResources();
-        EasyMock.expectLastCall();
-
         statusListener.onShutdown(taskId);
         expectLastCall();
 
@@ -153,9 +150,6 @@ public class WorkerTaskTest {
         workerTask.close();
         EasyMock.expectLastCall();
 
-        workerTask.releaseResources();
-        EasyMock.expectLastCall();
-
         replay(workerTask);
 
         workerTask.initialize(TASK_CONFIG);
@@ -219,9 +213,6 @@ public class WorkerTaskTest {
         workerTask.close();
         expectLastCall();
 
-        workerTask.releaseResources();
-        EasyMock.expectLastCall();
-
         // there should be no call to onShutdown()
 
         replay(workerTask);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 48ab58f..5177acf 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -602,6 +602,9 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
         EasyMock.expectLastCall();
 
+        workerTask.removeMetrics();
+        EasyMock.expectLastCall();
+
         expectStopStorage();
         expectClusterId();
 
@@ -677,6 +680,9 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
         EasyMock.expectLastCall();
 
+        workerTask.removeMetrics();
+        EasyMock.expectLastCall();
+
         // Each time we check the task metrics, the worker will call the herder
         herder.taskStatus(TASK_ID);
         EasyMock.expectLastCall()
@@ -890,6 +896,9 @@ public class WorkerTest extends ThreadedTest {
         // Note that in this case we *do not* commit offsets since it's an unclean shutdown
         EasyMock.expectLastCall();
 
+        workerTask.removeMetrics();
+        EasyMock.expectLastCall();
+
         expectStopStorage();
         expectClusterId();
 
@@ -964,6 +973,9 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
         EasyMock.expectLastCall();
 
+        workerTask.removeMetrics();
+        EasyMock.expectLastCall();
+
         expectStopStorage();
         expectClusterId();
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerWithTopicCreationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerWithTopicCreationTest.java
index 8b54033..f698a30 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerWithTopicCreationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerWithTopicCreationTest.java
@@ -594,6 +594,9 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
         EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
         EasyMock.expectLastCall();
 
+        workerTask.removeMetrics();
+        EasyMock.expectLastCall();
+
         expectStopStorage();
         expectClusterId();
 
@@ -669,6 +672,9 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
         EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
         EasyMock.expectLastCall();
 
+        workerTask.removeMetrics();
+        EasyMock.expectLastCall();
+
         // Each time we check the task metrics, the worker will call the herder
         herder.taskStatus(TASK_ID);
         EasyMock.expectLastCall()
@@ -879,6 +885,9 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
         // Note that in this case we *do not* commit offsets since it's an unclean shutdown
         EasyMock.expectLastCall();
 
+        workerTask.removeMetrics();
+        EasyMock.expectLastCall();
+
         expectStopStorage();
         expectClusterId();
 
@@ -953,6 +962,9 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
         EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
         EasyMock.expectLastCall();
 
+        workerTask.removeMetrics();
+        EasyMock.expectLastCall();
+
         expectStopStorage();
         expectClusterId();