You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/12/01 15:41:19 UTC

[GitHub] [kafka] C0urante commented on a change in pull request #10910: KAFKA-12965 - Graceful clean up of task error metrics

C0urante commented on a change in pull request #10910:
URL: https://github.com/apache/kafka/pull/10910#discussion_r760307980



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -113,6 +113,7 @@
 
     private final ConcurrentMap<String, WorkerConnector> connectors = new ConcurrentHashMap<>();
     private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new ConcurrentHashMap<>();
+    private final ConcurrentMap<ConnectorTaskId, ErrorHandlingMetrics> errorHandlingMetricsMap = new ConcurrentHashMap<>();

Review comment:
       Sorry, I didn't mean instantiating the `ErrorHandlingMetrics` object from within the `WorkerTask` class, but rather, accepting it as a constructor parameter so that it can be closed during `removeMetrics`. Something like:
   
   ```java
   abstract class WorkerTask implements Runnable {
   
       private final ErrorHandlingMetrics errorMetrics; // NEW
   
       public WorkerTask(ConnectorTaskId id,
                         TaskStatus.Listener statusListener,
                         TargetState initialState,
                         ClassLoader loader,
                         ConnectMetrics connectMetrics,
                         ErrorHandlingMetrics errorMetrics, // NEW
                         RetryWithToleranceOperator retryWithToleranceOperator,
                         Time time,
                         StatusBackingStore statusBackingStore) {
           this.id = id;
           this.taskMetricsGroup = new TaskMetricsGroup(this.id, connectMetrics, statusListener);
           this.errorMetrics = errorMetrics; // NEW
           this.statusListener = taskMetricsGroup;
           this.loader = loader;
           this.targetState = initialState;
           this.stopping = false;
           this.cancelled = false;
           this.taskMetricsGroup.recordState(this.targetState);
           this.retryWithToleranceOperator = retryWithToleranceOperator;
           this.time = time;
           this.statusBackingStore = statusBackingStore;
       }
   
       public void removeMetrics() {
           // Close quietly here so that we can be sure to close everything even if one attempt fails
           Utils.closeQuietly(taskMetricsGroup::close, "Task metrics group");
           Utils.closeQuietly(errorMetrics, "Error handling metrics"); // NEW
       }
   }
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -113,6 +113,7 @@
 
     private final ConcurrentMap<String, WorkerConnector> connectors = new ConcurrentHashMap<>();
     private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new ConcurrentHashMap<>();
+    private final ConcurrentMap<ConnectorTaskId, ErrorHandlingMetrics> errorHandlingMetricsMap = new ConcurrentHashMap<>();

Review comment:
       Sorry, I didn't mean instantiating the `ErrorHandlingMetrics` object from within the `WorkerTask` class, but rather, accepting it as a constructor parameter so that it can be closed during `removeMetrics`. Something like:
   
   ```java
   abstract class WorkerTask implements Runnable {
   
       private final ErrorHandlingMetrics errorMetrics; // NEW
   
       public WorkerTask(ConnectorTaskId id,
                         TaskStatus.Listener statusListener,
                         TargetState initialState,
                         ClassLoader loader,
                         ConnectMetrics connectMetrics,
                         ErrorHandlingMetrics errorMetrics, // NEW
                         RetryWithToleranceOperator retryWithToleranceOperator,
                         Time time,
                         StatusBackingStore statusBackingStore) {
           this.id = id;
           this.taskMetricsGroup = new TaskMetricsGroup(this.id, connectMetrics, statusListener);
           this.errorMetrics = errorMetrics; // NEW
           this.statusListener = taskMetricsGroup;
           this.loader = loader;
           this.targetState = initialState;
           this.stopping = false;
           this.cancelled = false;
           this.taskMetricsGroup.recordState(this.targetState);
           this.retryWithToleranceOperator = retryWithToleranceOperator;
           this.time = time;
           this.statusBackingStore = statusBackingStore;
       }
   
       public void removeMetrics() {
           // Close quietly here so that we can be sure to close everything even if one attempt fails
           Utils.closeQuietly(taskMetricsGroup::close, "Task metrics group");
           Utils.closeQuietly(errorMetrics, "Error handling metrics"); // NEW
       }
   }
   ```
   
   How does that look?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org