You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/08/12 22:42:49 UTC
[kafka] branch 2.5 updated: KAFKA-9066: Retain metrics for failed
tasks (#8502) (#8854)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new 89978e1 KAFKA-9066: Retain metrics for failed tasks (#8502) (#8854)
89978e1 is described below
commit 89978e1dcfcc8873f2898ae47107b897c2813196
Author: Randall Hauch <rh...@gmail.com>
AuthorDate: Wed Aug 12 17:42:12 2020 -0500
KAFKA-9066: Retain metrics for failed tasks (#8502) (#8854)
Author: Chris Egerton <ch...@confluent.io>
Reviewers: Nigel Liang <ni...@nigelliang.com>, Konstantine Karantasis <ko...@confluent.io>, Randall Hauch <rh...@gmail.com>
---
.../org/apache/kafka/connect/runtime/Worker.java | 7 +++++-
.../kafka/connect/runtime/WorkerSinkTask.java | 8 +++++--
.../kafka/connect/runtime/WorkerSourceTask.java | 8 +++++--
.../apache/kafka/connect/runtime/WorkerTask.java | 27 ++++++++--------------
.../kafka/connect/runtime/WorkerTaskTest.java | 9 --------
.../apache/kafka/connect/runtime/WorkerTest.java | 12 ++++++++++
6 files changed, 40 insertions(+), 31 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 99e44d4..266a597 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -733,13 +733,18 @@ public class Worker {
return;
}
- connectorStatusMetricsGroup.recordTaskRemoved(taskId);
if (!task.awaitStop(timeout)) {
log.error("Graceful stop of task {} failed.", task.id());
task.cancel();
} else {
log.debug("Graceful stop of task {} succeeded.", task.id());
}
+
+ try {
+ task.removeMetrics();
+ } finally {
+ connectorStatusMetricsGroup.recordTaskRemoved(taskId);
+ }
}
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 8ec6db1..5db4030 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -180,8 +180,12 @@ class WorkerSinkTask extends WorkerTask {
}
@Override
- protected void releaseResources() {
- sinkTaskMetricsGroup.close();
+ public void removeMetrics() {
+ try {
+ sinkTaskMetricsGroup.close();
+ } finally {
+ super.removeMetrics();
+ }
}
@Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index dc74747..31cbe5f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -175,8 +175,12 @@ class WorkerSourceTask extends WorkerTask {
}
@Override
- protected void releaseResources() {
- sourceTaskMetricsGroup.close();
+ public void removeMetrics() {
+ try {
+ sourceTaskMetricsGroup.close();
+ } finally {
+ super.removeMetrics();
+ }
}
@Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index b0a6a0c..11b0746 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -144,16 +144,17 @@ abstract class WorkerTask implements Runnable {
}
}
+ /**
+ * Remove all metrics published by this task.
+ */
+ public void removeMetrics() {
+ taskMetricsGroup.close();
+ }
+
protected abstract void execute();
protected abstract void close();
- /**
- * Method called when this worker task has been completely closed, and when the subclass should clean up
- * all resources.
- */
- protected abstract void releaseResources();
-
protected boolean isStopping() {
return stopping;
}
@@ -239,17 +240,9 @@ abstract class WorkerTask implements Runnable {
if (t instanceof Error)
throw (Error) t;
} finally {
- try {
- Thread.currentThread().setName(savedName);
- Plugins.compareAndSwapLoaders(savedLoader);
- shutdownLatch.countDown();
- } finally {
- try {
- releaseResources();
- } finally {
- taskMetricsGroup.close();
- }
- }
+ Thread.currentThread().setName(savedName);
+ Plugins.compareAndSwapLoaders(savedLoader);
+ shutdownLatch.countDown();
}
}
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
index 44c45d5..c26a88c 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
@@ -109,9 +109,6 @@ public class WorkerTaskTest {
workerTask.close();
expectLastCall();
- workerTask.releaseResources();
- EasyMock.expectLastCall();
-
statusListener.onShutdown(taskId);
expectLastCall();
@@ -153,9 +150,6 @@ public class WorkerTaskTest {
workerTask.close();
EasyMock.expectLastCall();
- workerTask.releaseResources();
- EasyMock.expectLastCall();
-
replay(workerTask);
workerTask.initialize(TASK_CONFIG);
@@ -219,9 +213,6 @@ public class WorkerTaskTest {
workerTask.close();
expectLastCall();
- workerTask.releaseResources();
- EasyMock.expectLastCall();
-
// there should be no call to onShutdown()
replay(workerTask);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 1e8c0bc..a15dffb 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -612,6 +612,9 @@ public class WorkerTest extends ThreadedTest {
EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
EasyMock.expectLastCall();
+ workerTask.removeMetrics();
+ EasyMock.expectLastCall();
+
expectStopStorage();
PowerMock.replayAll();
@@ -703,6 +706,9 @@ public class WorkerTest extends ThreadedTest {
EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
EasyMock.expectLastCall();
+ workerTask.removeMetrics();
+ EasyMock.expectLastCall();
+
// Each time we check the task metrics, the worker will call the herder
herder.taskStatus(TASK_ID);
EasyMock.expectLastCall()
@@ -929,6 +935,9 @@ public class WorkerTest extends ThreadedTest {
// Note that in this case we *do not* commit offsets since it's an unclean shutdown
EasyMock.expectLastCall();
+ workerTask.removeMetrics();
+ EasyMock.expectLastCall();
+
expectStopStorage();
PowerMock.replayAll();
@@ -1025,6 +1034,9 @@ public class WorkerTest extends ThreadedTest {
EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
EasyMock.expectLastCall();
+ workerTask.removeMetrics();
+ EasyMock.expectLastCall();
+
expectStopStorage();
PowerMock.replayAll();