You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/07/30 22:45:20 UTC

[kafka] branch 2.4 updated: KAFKA-9066: Retain metrics for failed tasks (#8502) (#9106)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new ef96ac0  KAFKA-9066: Retain metrics for failed tasks (#8502) (#9106)
ef96ac0 is described below

commit ef96ac07f565a73e35c5b0f4c56c8e87cfbaaf59
Author: Randall Hauch <rh...@gmail.com>
AuthorDate: Thu Jul 30 17:44:25 2020 -0500

    KAFKA-9066: Retain metrics for failed tasks (#8502) (#9106)
    
    Author: Chris Egerton <ch...@confluent.io>
    Reviewers: Nigel Liang <ni...@nigelliang.com>, Randall Hauch <rh...@gmail.com>, Konstantine Karantasis <ko...@confluent.io>
---
 .../org/apache/kafka/connect/runtime/Worker.java   |  7 +++++-
 .../kafka/connect/runtime/WorkerSinkTask.java      |  8 +++++--
 .../kafka/connect/runtime/WorkerSourceTask.java    |  8 +++++--
 .../apache/kafka/connect/runtime/WorkerTask.java   | 27 ++++++++--------------
 .../kafka/connect/runtime/WorkerTaskTest.java      |  9 --------
 .../apache/kafka/connect/runtime/WorkerTest.java   | 12 ++++++++++
 6 files changed, 40 insertions(+), 31 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 3cf4d01..9c02f8f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -733,13 +733,18 @@ public class Worker {
                 return;
             }
 
-            connectorStatusMetricsGroup.recordTaskRemoved(taskId);
             if (!task.awaitStop(timeout)) {
                 log.error("Graceful stop of task {} failed.", task.id());
                 task.cancel();
             } else {
                 log.debug("Graceful stop of task {} succeeded.", task.id());
             }
+
+            try {
+                task.removeMetrics();
+            } finally {
+                connectorStatusMetricsGroup.recordTaskRemoved(taskId);
+            }
         }
     }
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 0fa28e6..c0d9264 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -176,8 +176,12 @@ class WorkerSinkTask extends WorkerTask {
     }
 
     @Override
-    protected void releaseResources() {
-        sinkTaskMetricsGroup.close();
+    public void removeMetrics() {
+        try {
+            sinkTaskMetricsGroup.close();
+        } finally {
+            super.removeMetrics();
+        }
     }
 
     @Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index c3739b5..fdf1266 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -170,8 +170,12 @@ class WorkerSourceTask extends WorkerTask {
     }
 
     @Override
-    protected void releaseResources() {
-        sourceTaskMetricsGroup.close();
+    public void removeMetrics() {
+        try {
+            sourceTaskMetricsGroup.close();
+        } finally {
+            super.removeMetrics();
+        }
     }
 
     @Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index 05d18ff..83a7664 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -137,16 +137,17 @@ abstract class WorkerTask implements Runnable {
         }
     }
 
+    /**
+     * Remove all metrics published by this task.
+     */
+    public void removeMetrics() {
+        taskMetricsGroup.close();
+    }
+
     protected abstract void execute();
 
     protected abstract void close();
 
-    /**
-     * Method called when this worker task has been completely closed, and when the subclass should clean up
-     * all resources.
-     */
-    protected abstract void releaseResources();
-
     protected boolean isStopping() {
         return stopping;
     }
@@ -232,17 +233,9 @@ abstract class WorkerTask implements Runnable {
                 if (t instanceof Error)
                     throw (Error) t;
             } finally {
-                try {
-                    Thread.currentThread().setName(savedName);
-                    Plugins.compareAndSwapLoaders(savedLoader);
-                    shutdownLatch.countDown();
-                } finally {
-                    try {
-                        releaseResources();
-                    } finally {
-                        taskMetricsGroup.close();
-                    }
-                }
+                Thread.currentThread().setName(savedName);
+                Plugins.compareAndSwapLoaders(savedLoader);
+                shutdownLatch.countDown();
             }
         }
     }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
index 33349f4..28326c5 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
@@ -102,9 +102,6 @@ public class WorkerTaskTest {
         workerTask.close();
         expectLastCall();
 
-        workerTask.releaseResources();
-        EasyMock.expectLastCall();
-
         statusListener.onShutdown(taskId);
         expectLastCall();
 
@@ -143,9 +140,6 @@ public class WorkerTaskTest {
         workerTask.close();
         EasyMock.expectLastCall();
 
-        workerTask.releaseResources();
-        EasyMock.expectLastCall();
-
         replay(workerTask);
 
         workerTask.initialize(TASK_CONFIG);
@@ -206,9 +200,6 @@ public class WorkerTaskTest {
         workerTask.close();
         expectLastCall();
 
-        workerTask.releaseResources();
-        EasyMock.expectLastCall();
-
         // there should be no call to onShutdown()
 
         replay(workerTask);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index e7ffd60..b8cc033 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -603,6 +603,9 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
         EasyMock.expectLastCall();
 
+        workerTask.removeMetrics();
+        EasyMock.expectLastCall();
+
         expectStopStorage();
 
         PowerMock.replayAll();
@@ -692,6 +695,9 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
         EasyMock.expectLastCall();
 
+        workerTask.removeMetrics();
+        EasyMock.expectLastCall();
+
         // Each time we check the task metrics, the worker will call the herder
         herder.taskStatus(TASK_ID);
         EasyMock.expectLastCall()
@@ -915,6 +921,9 @@ public class WorkerTest extends ThreadedTest {
         // Note that in this case we *do not* commit offsets since it's an unclean shutdown
         EasyMock.expectLastCall();
 
+        workerTask.removeMetrics();
+        EasyMock.expectLastCall();
+
         expectStopStorage();
 
         PowerMock.replayAll();
@@ -1009,6 +1018,9 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
         EasyMock.expectLastCall();
 
+        workerTask.removeMetrics();
+        EasyMock.expectLastCall();
+
         expectStopStorage();
 
         PowerMock.replayAll();