You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/12/01 15:41:19 UTC
[GitHub] [kafka] C0urante commented on a change in pull request #10910: KAFKA-12965 - Graceful clean up of task error metrics
C0urante commented on a change in pull request #10910:
URL: https://github.com/apache/kafka/pull/10910#discussion_r760307980
##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -113,6 +113,7 @@
private final ConcurrentMap<String, WorkerConnector> connectors = new ConcurrentHashMap<>();
private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new ConcurrentHashMap<>();
+ private final ConcurrentMap<ConnectorTaskId, ErrorHandlingMetrics> errorHandlingMetricsMap = new ConcurrentHashMap<>();
Review comment:
Sorry, I didn't mean instantiating the `ErrorHandlingMetrics` object from within the `WorkerTask` class, but rather, accepting it as a constructor parameter so that it can be closed during `removeMetrics`. Something like:
```java
abstract class WorkerTask implements Runnable {
private final ErrorHandlingMetrics errorMetrics; // NEW
public WorkerTask(ConnectorTaskId id,
TaskStatus.Listener statusListener,
TargetState initialState,
ClassLoader loader,
ConnectMetrics connectMetrics,
ErrorHandlingMetrics errorMetrics, // NEW
RetryWithToleranceOperator retryWithToleranceOperator,
Time time,
StatusBackingStore statusBackingStore) {
this.id = id;
this.taskMetricsGroup = new TaskMetricsGroup(this.id, connectMetrics, statusListener);
this.errorMetrics = errorMetrics; // NEW
this.statusListener = taskMetricsGroup;
this.loader = loader;
this.targetState = initialState;
this.stopping = false;
this.cancelled = false;
this.taskMetricsGroup.recordState(this.targetState);
this.retryWithToleranceOperator = retryWithToleranceOperator;
this.time = time;
this.statusBackingStore = statusBackingStore;
}
public void removeMetrics() {
// Close quietly here so that we can be sure to close everything even if one attempt fails
Utils.closeQuietly(taskMetricsGroup::close, "Task metrics group");
Utils.closeQuietly(errorMetrics, "Error handling metrics"); // NEW
}
}
```
##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -113,6 +113,7 @@
private final ConcurrentMap<String, WorkerConnector> connectors = new ConcurrentHashMap<>();
private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new ConcurrentHashMap<>();
+ private final ConcurrentMap<ConnectorTaskId, ErrorHandlingMetrics> errorHandlingMetricsMap = new ConcurrentHashMap<>();
Review comment:
Sorry, I didn't mean instantiating the `ErrorHandlingMetrics` object from within the `WorkerTask` class, but rather, accepting it as a constructor parameter so that it can be closed during `removeMetrics`. Something like:
```java
abstract class WorkerTask implements Runnable {
private final ErrorHandlingMetrics errorMetrics; // NEW
public WorkerTask(ConnectorTaskId id,
TaskStatus.Listener statusListener,
TargetState initialState,
ClassLoader loader,
ConnectMetrics connectMetrics,
ErrorHandlingMetrics errorMetrics, // NEW
RetryWithToleranceOperator retryWithToleranceOperator,
Time time,
StatusBackingStore statusBackingStore) {
this.id = id;
this.taskMetricsGroup = new TaskMetricsGroup(this.id, connectMetrics, statusListener);
this.errorMetrics = errorMetrics; // NEW
this.statusListener = taskMetricsGroup;
this.loader = loader;
this.targetState = initialState;
this.stopping = false;
this.cancelled = false;
this.taskMetricsGroup.recordState(this.targetState);
this.retryWithToleranceOperator = retryWithToleranceOperator;
this.time = time;
this.statusBackingStore = statusBackingStore;
}
public void removeMetrics() {
// Close quietly here so that we can be sure to close everything even if one attempt fails
Utils.closeQuietly(taskMetricsGroup::close, "Task metrics group");
Utils.closeQuietly(errorMetrics, "Error handling metrics"); // NEW
}
}
```
How does that look?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org