You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/08/12 22:42:49 UTC

[kafka] branch 2.5 updated: KAFKA-9066: Retain metrics for failed tasks (#8502) (#8854)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 89978e1  KAFKA-9066: Retain metrics for failed tasks (#8502) (#8854)
89978e1 is described below

commit 89978e1dcfcc8873f2898ae47107b897c2813196
Author: Randall Hauch <rh...@gmail.com>
AuthorDate: Wed Aug 12 17:42:12 2020 -0500

    KAFKA-9066: Retain metrics for failed tasks (#8502) (#8854)
    
    Author: Chris Egerton <ch...@confluent.io>
    Reviewers: Nigel Liang <ni...@nigelliang.com>, Konstantine Karantasis <ko...@confluent.io>, Randall Hauch <rh...@gmail.com>
---
 .../org/apache/kafka/connect/runtime/Worker.java   |  7 +++++-
 .../kafka/connect/runtime/WorkerSinkTask.java      |  8 +++++--
 .../kafka/connect/runtime/WorkerSourceTask.java    |  8 +++++--
 .../apache/kafka/connect/runtime/WorkerTask.java   | 27 ++++++++--------------
 .../kafka/connect/runtime/WorkerTaskTest.java      |  9 --------
 .../apache/kafka/connect/runtime/WorkerTest.java   | 12 ++++++++++
 6 files changed, 40 insertions(+), 31 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 99e44d4..266a597 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -733,13 +733,18 @@ public class Worker {
                 return;
             }
 
-            connectorStatusMetricsGroup.recordTaskRemoved(taskId);
             if (!task.awaitStop(timeout)) {
                 log.error("Graceful stop of task {} failed.", task.id());
                 task.cancel();
             } else {
                 log.debug("Graceful stop of task {} succeeded.", task.id());
             }
+
+            try {
+                task.removeMetrics();
+            } finally {
+                connectorStatusMetricsGroup.recordTaskRemoved(taskId);
+            }
         }
     }
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 8ec6db1..5db4030 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -180,8 +180,12 @@ class WorkerSinkTask extends WorkerTask {
     }
 
     @Override
-    protected void releaseResources() {
-        sinkTaskMetricsGroup.close();
+    public void removeMetrics() {
+        try {
+            sinkTaskMetricsGroup.close();
+        } finally {
+            super.removeMetrics();
+        }
     }
 
     @Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index dc74747..31cbe5f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -175,8 +175,12 @@ class WorkerSourceTask extends WorkerTask {
     }
 
     @Override
-    protected void releaseResources() {
-        sourceTaskMetricsGroup.close();
+    public void removeMetrics() {
+        try {
+            sourceTaskMetricsGroup.close();
+        } finally {
+            super.removeMetrics();
+        }
     }
 
     @Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index b0a6a0c..11b0746 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -144,16 +144,17 @@ abstract class WorkerTask implements Runnable {
         }
     }
 
+    /**
+     * Remove all metrics published by this task.
+     */
+    public void removeMetrics() {
+        taskMetricsGroup.close();
+    }
+
     protected abstract void execute();
 
     protected abstract void close();
 
-    /**
-     * Method called when this worker task has been completely closed, and when the subclass should clean up
-     * all resources.
-     */
-    protected abstract void releaseResources();
-
     protected boolean isStopping() {
         return stopping;
     }
@@ -239,17 +240,9 @@ abstract class WorkerTask implements Runnable {
                 if (t instanceof Error)
                     throw (Error) t;
             } finally {
-                try {
-                    Thread.currentThread().setName(savedName);
-                    Plugins.compareAndSwapLoaders(savedLoader);
-                    shutdownLatch.countDown();
-                } finally {
-                    try {
-                        releaseResources();
-                    } finally {
-                        taskMetricsGroup.close();
-                    }
-                }
+                Thread.currentThread().setName(savedName);
+                Plugins.compareAndSwapLoaders(savedLoader);
+                shutdownLatch.countDown();
             }
         }
     }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
index 44c45d5..c26a88c 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
@@ -109,9 +109,6 @@ public class WorkerTaskTest {
         workerTask.close();
         expectLastCall();
 
-        workerTask.releaseResources();
-        EasyMock.expectLastCall();
-
         statusListener.onShutdown(taskId);
         expectLastCall();
 
@@ -153,9 +150,6 @@ public class WorkerTaskTest {
         workerTask.close();
         EasyMock.expectLastCall();
 
-        workerTask.releaseResources();
-        EasyMock.expectLastCall();
-
         replay(workerTask);
 
         workerTask.initialize(TASK_CONFIG);
@@ -219,9 +213,6 @@ public class WorkerTaskTest {
         workerTask.close();
         expectLastCall();
 
-        workerTask.releaseResources();
-        EasyMock.expectLastCall();
-
         // there should be no call to onShutdown()
 
         replay(workerTask);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 1e8c0bc..a15dffb 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -612,6 +612,9 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
         EasyMock.expectLastCall();
 
+        workerTask.removeMetrics();
+        EasyMock.expectLastCall();
+
         expectStopStorage();
 
         PowerMock.replayAll();
@@ -703,6 +706,9 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
         EasyMock.expectLastCall();
 
+        workerTask.removeMetrics();
+        EasyMock.expectLastCall();
+
         // Each time we check the task metrics, the worker will call the herder
         herder.taskStatus(TASK_ID);
         EasyMock.expectLastCall()
@@ -929,6 +935,9 @@ public class WorkerTest extends ThreadedTest {
         // Note that in this case we *do not* commit offsets since it's an unclean shutdown
         EasyMock.expectLastCall();
 
+        workerTask.removeMetrics();
+        EasyMock.expectLastCall();
+
         expectStopStorage();
 
         PowerMock.replayAll();
@@ -1025,6 +1034,9 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
         EasyMock.expectLastCall();
 
+        workerTask.removeMetrics();
+        EasyMock.expectLastCall();
+
         expectStopStorage();
 
         PowerMock.replayAll();