You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/21 11:01:54 UTC

[GitHub] [kafka] ramesh-muthusamy opened a new pull request #10910: KAFKA-12965 - Graceful clean up of task error metrics

ramesh-muthusamy opened a new pull request #10910:
URL: https://github.com/apache/kafka/pull/10910


   Issue: 
   We noticed that the Error metrics reported in Kafka Connect worker continues to stay even after the task is re distributed to another worker. As a result you would notice over a period of time the task_error_metrics  of a worker would contain the errors of all the tasks  that it had ever come across. 
   This is an anti pattern to what other metrics are reported by Kafka Connect worker. The Kafka Connect worker should only report the error metrics of the present task and leave the persistence of the previous tasks to the metrics storage system that is consuming these metrics. 
   In the below example we notice that there is only 2 active tasks that are running , but we have more than 20+ tasks error metrics that are available. 
    
   Task counter mbean:
   {"request":\{"mbean":"kafka.connect:type=connect-worker-metrics","type":"read"}
   ,"value":{"connector-startup-failure-percentage":0.0,"task-startup-attempts-total":90.0,"connector-startup-success-total":1.0,"connector-startup-failure-total":0.0,"task-startup-success-percentage":0.0,"connector-startup-attempts-total":1.0,"connector-count":0.0,"connector-startup-success-percentage":0.0,"task-startup-success-total":90.0,"task-startup-failure-percentage":0.0,"task-count":2.0,"task-startup-failure-total":0.0},"timestamp":1623852927,"status":200}
    
   Task Error metrics mbean: 
   {"request":\{"mbean":"kafka.connect:connector=*,task=*,type=task-error-metrics","type":"read"}
   ,"value":{"kafka.connect:connector=***********,task=35,type=task-error-metrics":
   {"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0}
   ,"kafka.connect:connector=**********,task=38,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=*********,task=14,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=*********,task=5,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=*********,task=0,type=task-error-metrics":{"last-error-timestamp":
 0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=*********,task=29,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=*********,task=37,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=*********,task=28,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-fa
 ilures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=*********,task=25,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=*********,task=91,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=*********,task=31,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,
 "total-retries":0.0},"kafka.connect:connector=*********,task=7,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=*********,task=74,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=*********,task=2,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=*********,task=26,type=task-error-metrics":{"last-
 error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=*********,task=30,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=*********,task=53,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=**********,task=16,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadlett
 erqueue-produce-failures":0.0,"total-record-failures":0.0, .....
   
   Solution: 
   As part of the bug fix to KAFKA-12965 introducing code changes to gracefully cleanup the error handling metrics associated with a task. This is required to avoid duplicate metrics of task being reported from a worker that had the same task in the past.
   
   UT - Not yet covered, in progress


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #10910: KAFKA-12965 - Graceful clean up of task error metrics

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #10910:
URL: https://github.com/apache/kafka/pull/10910#discussion_r764951107



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorHandlingMetrics.java
##########
@@ -138,4 +142,12 @@ public void recordErrorTimestamp() {
     public ConnectMetrics.MetricGroup metricGroup() {
         return metricGroup;
     }
+
+    /**
+     * Close the task Error metrics group when the task is closed
+     */
+    public void close() {

Review comment:
       Nit:
   ```suggestion
       @Override
       public void close() {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #10910: KAFKA-12965 - Graceful clean up of task error metrics

Posted by GitBox <gi...@apache.org>.
C0urante commented on pull request #10910:
URL: https://github.com/apache/kafka/pull/10910#issuecomment-987595765


   Thanks, LGTM pending unit tests to verify expected behavior and help prevent regressions in the future.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ramesh-muthusamy commented on a change in pull request #10910: KAFKA-12965 - Graceful clean up of task error metrics

Posted by GitBox <gi...@apache.org>.
ramesh-muthusamy commented on a change in pull request #10910:
URL: https://github.com/apache/kafka/pull/10910#discussion_r759823333



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -113,6 +113,7 @@
 
     private final ConcurrentMap<String, WorkerConnector> connectors = new ConcurrentHashMap<>();
     private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new ConcurrentHashMap<>();
+    private final ConcurrentMap<ConnectorTaskId, ErrorHandlingMetrics> errorHandlingMetricsMap = new ConcurrentHashMap<>();

Review comment:
       Actually I did explore that option, but unfortunately the worker task instantiation happens post registration of error handling metrics . From the object lifecycle view I would then need to pass the destruction of the error handling metrics to worker source task while still keeping the instantiation to worker task.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #10910: KAFKA-12965 - Graceful clean up of task error metrics

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #10910:
URL: https://github.com/apache/kafka/pull/10910#discussion_r760307980



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -113,6 +113,7 @@
 
     private final ConcurrentMap<String, WorkerConnector> connectors = new ConcurrentHashMap<>();
     private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new ConcurrentHashMap<>();
+    private final ConcurrentMap<ConnectorTaskId, ErrorHandlingMetrics> errorHandlingMetricsMap = new ConcurrentHashMap<>();

Review comment:
       Sorry, I didn't mean instantiating the `ErrorHandlingMetrics` object from within the `WorkerTask` class, but rather, accepting it as a constructor parameter so that it can be closed during `removeMetrics`. Something like:
   
   ```java
   abstract class WorkerTask implements Runnable {
   
       private final ErrorHandlingMetrics errorMetrics; // NEW
   
       public WorkerTask(ConnectorTaskId id,
                         TaskStatus.Listener statusListener,
                         TargetState initialState,
                         ClassLoader loader,
                         ConnectMetrics connectMetrics,
                         ErrorHandlingMetrics errorMetrics, // NEW
                         RetryWithToleranceOperator retryWithToleranceOperator,
                         Time time,
                         StatusBackingStore statusBackingStore) {
           this.id = id;
           this.taskMetricsGroup = new TaskMetricsGroup(this.id, connectMetrics, statusListener);
           this.errorMetrics = errorMetrics; // NEW
           this.statusListener = taskMetricsGroup;
           this.loader = loader;
           this.targetState = initialState;
           this.stopping = false;
           this.cancelled = false;
           this.taskMetricsGroup.recordState(this.targetState);
           this.retryWithToleranceOperator = retryWithToleranceOperator;
           this.time = time;
           this.statusBackingStore = statusBackingStore;
       }
   
       public void removeMetrics() {
           // Close quietly here so that we can be sure to close everything even if one attempt fails
           Utils.closeQuietly(taskMetricsGroup::close, "Task metrics group");
           Utils.closeQuietly(errorMetrics, "Error handling metrics"); // NEW
       }
   }
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -113,6 +113,7 @@
 
     private final ConcurrentMap<String, WorkerConnector> connectors = new ConcurrentHashMap<>();
     private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new ConcurrentHashMap<>();
+    private final ConcurrentMap<ConnectorTaskId, ErrorHandlingMetrics> errorHandlingMetricsMap = new ConcurrentHashMap<>();

Review comment:
       Sorry, I didn't mean instantiating the `ErrorHandlingMetrics` object from within the `WorkerTask` class, but rather, accepting it as a constructor parameter so that it can be closed during `removeMetrics`. Something like:
   
   ```java
   abstract class WorkerTask implements Runnable {
   
       private final ErrorHandlingMetrics errorMetrics; // NEW
   
       public WorkerTask(ConnectorTaskId id,
                         TaskStatus.Listener statusListener,
                         TargetState initialState,
                         ClassLoader loader,
                         ConnectMetrics connectMetrics,
                         ErrorHandlingMetrics errorMetrics, // NEW
                         RetryWithToleranceOperator retryWithToleranceOperator,
                         Time time,
                         StatusBackingStore statusBackingStore) {
           this.id = id;
           this.taskMetricsGroup = new TaskMetricsGroup(this.id, connectMetrics, statusListener);
           this.errorMetrics = errorMetrics; // NEW
           this.statusListener = taskMetricsGroup;
           this.loader = loader;
           this.targetState = initialState;
           this.stopping = false;
           this.cancelled = false;
           this.taskMetricsGroup.recordState(this.targetState);
           this.retryWithToleranceOperator = retryWithToleranceOperator;
           this.time = time;
           this.statusBackingStore = statusBackingStore;
       }
   
       public void removeMetrics() {
           // Close quietly here so that we can be sure to close everything even if one attempt fails
           Utils.closeQuietly(taskMetricsGroup::close, "Task metrics group");
           Utils.closeQuietly(errorMetrics, "Error handling metrics"); // NEW
       }
   }
   ```
   
   How does that look?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ramesh-muthusamy commented on a change in pull request #10910: KAFKA-12965 - Graceful clean up of task error metrics

Posted by GitBox <gi...@apache.org>.
ramesh-muthusamy commented on a change in pull request #10910:
URL: https://github.com/apache/kafka/pull/10910#discussion_r763065987



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorHandlingMetrics.java
##########
@@ -23,16 +23,20 @@
 import org.apache.kafka.connect.runtime.ConnectMetrics;
 import org.apache.kafka.connect.runtime.ConnectMetricsRegistry;
 import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Contains various sensors used for monitoring errors.
  */
-public class ErrorHandlingMetrics {
+public class  ErrorHandlingMetrics {

Review comment:
       yes this is reverted.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ramesh-muthusamy commented on a change in pull request #10910: KAFKA-12965 - Graceful clean up of task error metrics

Posted by GitBox <gi...@apache.org>.
ramesh-muthusamy commented on a change in pull request #10910:
URL: https://github.com/apache/kafka/pull/10910#discussion_r763626929



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -113,6 +113,7 @@
 
     private final ConcurrentMap<String, WorkerConnector> connectors = new ConcurrentHashMap<>();
     private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new ConcurrentHashMap<>();
+    private final ConcurrentMap<ConnectorTaskId, ErrorHandlingMetrics> errorHandlingMetricsMap = new ConcurrentHashMap<>();

Review comment:
       @C0urante  I have incorporated the review comments, can you help review.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #10910: KAFKA-12965 - Graceful clean up of task error metrics

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #10910:
URL: https://github.com/apache/kafka/pull/10910#discussion_r758864544



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorHandlingMetrics.java
##########
@@ -23,16 +23,20 @@
 import org.apache.kafka.connect.runtime.ConnectMetrics;
 import org.apache.kafka.connect.runtime.ConnectMetricsRegistry;
 import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Contains various sensors used for monitoring errors.
  */
-public class ErrorHandlingMetrics {
+public class  ErrorHandlingMetrics {

Review comment:
       Can this be reverted? The existing formatting appears to be correct.
   
   ```suggestion
   public class ErrorHandlingMetrics {
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -113,6 +113,7 @@
 
     private final ConcurrentMap<String, WorkerConnector> connectors = new ConcurrentHashMap<>();
     private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new ConcurrentHashMap<>();
+    private final ConcurrentMap<ConnectorTaskId, ErrorHandlingMetrics> errorHandlingMetricsMap = new ConcurrentHashMap<>();

Review comment:
       Rather than do this bookkeeping here, could we pass the `ErrorHandlingMetrics` instance to the `WorkerTask` class in its constructor, and then close it in `WorkerTask::removeMetrics`? It'd align nicely with the existing contract for that method, which is that it will "Remove all metrics published by this task."

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorHandlingMetrics.java
##########
@@ -138,4 +142,12 @@ public void recordErrorTimestamp() {
     public ConnectMetrics.MetricGroup metricGroup() {
         return metricGroup;
     }
+
+    /**
+     * Close the task Error metrics group when the task is closed
+     */
+    public  void closeTaskErrorMetricGroup() {

Review comment:
       Nit: whitespace
   ```suggestion
       public void closeTaskErrorMetricGroup() {
   ```
   
   Also, just curious, any reason we don't want to implement `Autocloseable` and rename this method to `close`? It'd align nicely with the precedent set in https://github.com/apache/kafka/pull/8442, for example, and would make this class easer to use with [Utils::closeQuietly](https://github.com/apache/kafka/blob/e8dcbb99bb3289193a9036599d87acd56e11499f/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L998-L1009) if we wanted to go that route in the future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ramesh-muthusamy commented on a change in pull request #10910: KAFKA-12965 - Graceful clean up of task error metrics

Posted by GitBox <gi...@apache.org>.
ramesh-muthusamy commented on a change in pull request #10910:
URL: https://github.com/apache/kafka/pull/10910#discussion_r759824560



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorHandlingMetrics.java
##########
@@ -138,4 +142,12 @@ public void recordErrorTimestamp() {
     public ConnectMetrics.MetricGroup metricGroup() {
         return metricGroup;
     }
+
+    /**
+     * Close the task Error metrics group when the task is closed
+     */
+    public  void closeTaskErrorMetricGroup() {

Review comment:
       good idea, I will incorporate the recommendation




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ramesh-muthusamy commented on pull request #10910: KAFKA-12965 - Graceful clean up of task error metrics

Posted by GitBox <gi...@apache.org>.
ramesh-muthusamy commented on pull request #10910:
URL: https://github.com/apache/kafka/pull/10910#issuecomment-923052144


   @kkonstantine  can you help reviewing the PR 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org