You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/07/30 22:45:20 UTC
[kafka] branch 2.4 updated: KAFKA-9066: Retain metrics for failed
tasks (#8502) (#9106)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new ef96ac0 KAFKA-9066: Retain metrics for failed tasks (#8502) (#9106)
ef96ac0 is described below
commit ef96ac07f565a73e35c5b0f4c56c8e87cfbaaf59
Author: Randall Hauch <rh...@gmail.com>
AuthorDate: Thu Jul 30 17:44:25 2020 -0500
KAFKA-9066: Retain metrics for failed tasks (#8502) (#9106)
Author: Chris Egerton <ch...@confluent.io>
Reviewers: Nigel Liang <ni...@nigelliang.com>, Randall Hauch <rh...@gmail.com>, Konstantine Karantasis <ko...@confluent.io>
---
.../org/apache/kafka/connect/runtime/Worker.java | 7 +++++-
.../kafka/connect/runtime/WorkerSinkTask.java | 8 +++++--
.../kafka/connect/runtime/WorkerSourceTask.java | 8 +++++--
.../apache/kafka/connect/runtime/WorkerTask.java | 27 ++++++++--------------
.../kafka/connect/runtime/WorkerTaskTest.java | 9 --------
.../apache/kafka/connect/runtime/WorkerTest.java | 12 ++++++++++
6 files changed, 40 insertions(+), 31 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 3cf4d01..9c02f8f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -733,13 +733,18 @@ public class Worker {
return;
}
- connectorStatusMetricsGroup.recordTaskRemoved(taskId);
if (!task.awaitStop(timeout)) {
log.error("Graceful stop of task {} failed.", task.id());
task.cancel();
} else {
log.debug("Graceful stop of task {} succeeded.", task.id());
}
+
+ try {
+ task.removeMetrics();
+ } finally {
+ connectorStatusMetricsGroup.recordTaskRemoved(taskId);
+ }
}
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 0fa28e6..c0d9264 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -176,8 +176,12 @@ class WorkerSinkTask extends WorkerTask {
}
@Override
- protected void releaseResources() {
- sinkTaskMetricsGroup.close();
+ public void removeMetrics() {
+ try {
+ sinkTaskMetricsGroup.close();
+ } finally {
+ super.removeMetrics();
+ }
}
@Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index c3739b5..fdf1266 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -170,8 +170,12 @@ class WorkerSourceTask extends WorkerTask {
}
@Override
- protected void releaseResources() {
- sourceTaskMetricsGroup.close();
+ public void removeMetrics() {
+ try {
+ sourceTaskMetricsGroup.close();
+ } finally {
+ super.removeMetrics();
+ }
}
@Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index 05d18ff..83a7664 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -137,16 +137,17 @@ abstract class WorkerTask implements Runnable {
}
}
+ /**
+ * Remove all metrics published by this task.
+ */
+ public void removeMetrics() {
+ taskMetricsGroup.close();
+ }
+
protected abstract void execute();
protected abstract void close();
- /**
- * Method called when this worker task has been completely closed, and when the subclass should clean up
- * all resources.
- */
- protected abstract void releaseResources();
-
protected boolean isStopping() {
return stopping;
}
@@ -232,17 +233,9 @@ abstract class WorkerTask implements Runnable {
if (t instanceof Error)
throw (Error) t;
} finally {
- try {
- Thread.currentThread().setName(savedName);
- Plugins.compareAndSwapLoaders(savedLoader);
- shutdownLatch.countDown();
- } finally {
- try {
- releaseResources();
- } finally {
- taskMetricsGroup.close();
- }
- }
+ Thread.currentThread().setName(savedName);
+ Plugins.compareAndSwapLoaders(savedLoader);
+ shutdownLatch.countDown();
}
}
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
index 33349f4..28326c5 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
@@ -102,9 +102,6 @@ public class WorkerTaskTest {
workerTask.close();
expectLastCall();
- workerTask.releaseResources();
- EasyMock.expectLastCall();
-
statusListener.onShutdown(taskId);
expectLastCall();
@@ -143,9 +140,6 @@ public class WorkerTaskTest {
workerTask.close();
EasyMock.expectLastCall();
- workerTask.releaseResources();
- EasyMock.expectLastCall();
-
replay(workerTask);
workerTask.initialize(TASK_CONFIG);
@@ -206,9 +200,6 @@ public class WorkerTaskTest {
workerTask.close();
expectLastCall();
- workerTask.releaseResources();
- EasyMock.expectLastCall();
-
// there should be no call to onShutdown()
replay(workerTask);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index e7ffd60..b8cc033 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -603,6 +603,9 @@ public class WorkerTest extends ThreadedTest {
EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
EasyMock.expectLastCall();
+ workerTask.removeMetrics();
+ EasyMock.expectLastCall();
+
expectStopStorage();
PowerMock.replayAll();
@@ -692,6 +695,9 @@ public class WorkerTest extends ThreadedTest {
EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
EasyMock.expectLastCall();
+ workerTask.removeMetrics();
+ EasyMock.expectLastCall();
+
// Each time we check the task metrics, the worker will call the herder
herder.taskStatus(TASK_ID);
EasyMock.expectLastCall()
@@ -915,6 +921,9 @@ public class WorkerTest extends ThreadedTest {
// Note that in this case we *do not* commit offsets since it's an unclean shutdown
EasyMock.expectLastCall();
+ workerTask.removeMetrics();
+ EasyMock.expectLastCall();
+
expectStopStorage();
PowerMock.replayAll();
@@ -1009,6 +1018,9 @@ public class WorkerTest extends ThreadedTest {
EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
EasyMock.expectLastCall();
+ workerTask.removeMetrics();
+ EasyMock.expectLastCall();
+
expectStopStorage();
PowerMock.replayAll();