You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/06/11 03:39:35 UTC
[kafka] 01/02: KAFKA-9066: Retain metrics for failed tasks (#8502)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit eb58b61603e8bdb0ccf94af3f956a09b5591d5b3
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Wed Jun 10 20:03:25 2020 -0700
KAFKA-9066: Retain metrics for failed tasks (#8502)
Author: Chris Egerton <ch...@confluent.io>
Reviewers: Nigel Liang <ni...@nigelliang.com>, Randall Hauch <rh...@gmail.com>
---
.../org/apache/kafka/connect/runtime/Worker.java | 7 +++++-
.../kafka/connect/runtime/WorkerSinkTask.java | 8 +++++--
.../kafka/connect/runtime/WorkerSourceTask.java | 8 +++++--
.../apache/kafka/connect/runtime/WorkerTask.java | 27 ++++++++--------------
.../kafka/connect/runtime/WorkerTaskTest.java | 9 --------
.../apache/kafka/connect/runtime/WorkerTest.java | 12 ++++++++++
.../runtime/WorkerWithTopicCreationTest.java | 12 ++++++++++
7 files changed, 52 insertions(+), 31 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 26b0444..67a27e1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -789,13 +789,18 @@ public class Worker {
return;
}
- connectorStatusMetricsGroup.recordTaskRemoved(taskId);
if (!task.awaitStop(timeout)) {
log.error("Graceful stop of task {} failed.", task.id());
task.cancel();
} else {
log.debug("Graceful stop of task {} succeeded.", task.id());
}
+
+ try {
+ task.removeMetrics();
+ } finally {
+ connectorStatusMetricsGroup.recordTaskRemoved(taskId);
+ }
}
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index c22ce4a..50efffc 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -174,8 +174,12 @@ class WorkerSinkTask extends WorkerTask {
}
@Override
- protected void releaseResources() {
- sinkTaskMetricsGroup.close();
+ public void removeMetrics() {
+ try {
+ sinkTaskMetricsGroup.close();
+ } finally {
+ super.removeMetrics();
+ }
}
@Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index e44d93a..1febd7f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -188,8 +188,12 @@ class WorkerSourceTask extends WorkerTask {
}
@Override
- protected void releaseResources() {
- sourceTaskMetricsGroup.close();
+ public void removeMetrics() {
+ try {
+ sourceTaskMetricsGroup.close();
+ } finally {
+ super.removeMetrics();
+ }
}
@Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index b0a6a0c..11b0746 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -144,16 +144,17 @@ abstract class WorkerTask implements Runnable {
}
}
+ /**
+ * Remove all metrics published by this task.
+ */
+ public void removeMetrics() {
+ taskMetricsGroup.close();
+ }
+
protected abstract void execute();
protected abstract void close();
- /**
- * Method called when this worker task has been completely closed, and when the subclass should clean up
- * all resources.
- */
- protected abstract void releaseResources();
-
protected boolean isStopping() {
return stopping;
}
@@ -239,17 +240,9 @@ abstract class WorkerTask implements Runnable {
if (t instanceof Error)
throw (Error) t;
} finally {
- try {
- Thread.currentThread().setName(savedName);
- Plugins.compareAndSwapLoaders(savedLoader);
- shutdownLatch.countDown();
- } finally {
- try {
- releaseResources();
- } finally {
- taskMetricsGroup.close();
- }
- }
+ Thread.currentThread().setName(savedName);
+ Plugins.compareAndSwapLoaders(savedLoader);
+ shutdownLatch.countDown();
}
}
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
index 44c45d5..c26a88c 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
@@ -109,9 +109,6 @@ public class WorkerTaskTest {
workerTask.close();
expectLastCall();
- workerTask.releaseResources();
- EasyMock.expectLastCall();
-
statusListener.onShutdown(taskId);
expectLastCall();
@@ -153,9 +150,6 @@ public class WorkerTaskTest {
workerTask.close();
EasyMock.expectLastCall();
- workerTask.releaseResources();
- EasyMock.expectLastCall();
-
replay(workerTask);
workerTask.initialize(TASK_CONFIG);
@@ -219,9 +213,6 @@ public class WorkerTaskTest {
workerTask.close();
expectLastCall();
- workerTask.releaseResources();
- EasyMock.expectLastCall();
-
// there should be no call to onShutdown()
replay(workerTask);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 48ab58f..5177acf 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -602,6 +602,9 @@ public class WorkerTest extends ThreadedTest {
EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
EasyMock.expectLastCall();
+ workerTask.removeMetrics();
+ EasyMock.expectLastCall();
+
expectStopStorage();
expectClusterId();
@@ -677,6 +680,9 @@ public class WorkerTest extends ThreadedTest {
EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
EasyMock.expectLastCall();
+ workerTask.removeMetrics();
+ EasyMock.expectLastCall();
+
// Each time we check the task metrics, the worker will call the herder
herder.taskStatus(TASK_ID);
EasyMock.expectLastCall()
@@ -890,6 +896,9 @@ public class WorkerTest extends ThreadedTest {
// Note that in this case we *do not* commit offsets since it's an unclean shutdown
EasyMock.expectLastCall();
+ workerTask.removeMetrics();
+ EasyMock.expectLastCall();
+
expectStopStorage();
expectClusterId();
@@ -964,6 +973,9 @@ public class WorkerTest extends ThreadedTest {
EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
EasyMock.expectLastCall();
+ workerTask.removeMetrics();
+ EasyMock.expectLastCall();
+
expectStopStorage();
expectClusterId();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerWithTopicCreationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerWithTopicCreationTest.java
index 8b54033..f698a30 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerWithTopicCreationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerWithTopicCreationTest.java
@@ -594,6 +594,9 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
EasyMock.expectLastCall();
+ workerTask.removeMetrics();
+ EasyMock.expectLastCall();
+
expectStopStorage();
expectClusterId();
@@ -669,6 +672,9 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
EasyMock.expectLastCall();
+ workerTask.removeMetrics();
+ EasyMock.expectLastCall();
+
// Each time we check the task metrics, the worker will call the herder
herder.taskStatus(TASK_ID);
EasyMock.expectLastCall()
@@ -879,6 +885,9 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
// Note that in this case we *do not* commit offsets since it's an unclean shutdown
EasyMock.expectLastCall();
+ workerTask.removeMetrics();
+ EasyMock.expectLastCall();
+
expectStopStorage();
expectClusterId();
@@ -953,6 +962,9 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
EasyMock.expectLastCall();
+ workerTask.removeMetrics();
+ EasyMock.expectLastCall();
+
expectStopStorage();
expectClusterId();