You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/10 05:39:30 UTC

[GitHub] [kafka] michael-carter-instaclustr opened a new pull request #8844: Kafka 9887 fix failed task or connector count on startup failure

michael-carter-instaclustr opened a new pull request #8844:
URL: https://github.com/apache/kafka/pull/8844


   Moved the responsibility for recording task and connector startup and failure metrics from the Worker class  into the status listener that gets passed into the WorkerTask. The status listener is decorated to record the metrics when onStartup or onFailure occur (if failure happens before startup).
   This gets around the previous issues where these metrics were not being recorded because 
   the WorkerTasks/WorkerConnectors were either not propagating exceptions upwards, or were unable to do so easily because they were running on completely different threads.
   Also split out WorkerMetricsGroup from being an inner class into being a standard class. This was to make sure the Data Abstraction Coupling checkStyle rule for the Worker class was not violated.
   
   Testing involved adding some unit tests for the decorated listeners to ensure they recorded metrics correctly. Some manual testing was done with Connectors and Tasks that deliberately threw exceptions in the startup phase to ensure JMX metrics were reported correctly.
   Some of the unit tests for the Worker class had some assertions around startup statistics removed, as setting these statistics is no longer the direct responsibility of the Worker class,
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] michael-carter-instaclustr commented on a change in pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
michael-carter-instaclustr commented on a change in pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#discussion_r443386551



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerMetricsGroup.java
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+import java.util.Map;
+
+class WorkerMetricsGroup {
+    private final ConnectMetrics.MetricGroup metricGroup;
+    private final Sensor connectorStartupAttempts;
+    private final Sensor connectorStartupSuccesses;
+    private final Sensor connectorStartupFailures;
+    private final Sensor connectorStartupResults;
+    private final Sensor taskStartupAttempts;
+    private final Sensor taskStartupSuccesses;
+    private final Sensor taskStartupFailures;
+    private final Sensor taskStartupResults;
+
+    public WorkerMetricsGroup(final Map<String, WorkerConnector> connectors, Map<ConnectorTaskId, WorkerTask> tasks, ConnectMetrics connectMetrics) {
+        ConnectMetricsRegistry registry = connectMetrics.registry();
+        metricGroup = connectMetrics.group(registry.workerGroupName());
+
+        metricGroup.addValueMetric(registry.connectorCount, now -> (double) connectors.size());
+        metricGroup.addValueMetric(registry.taskCount, now -> (double) tasks.size());
+
+        MetricName connectorFailurePct = metricGroup.metricName(registry.connectorStartupFailurePercentage);
+        MetricName connectorSuccessPct = metricGroup.metricName(registry.connectorStartupSuccessPercentage);
+        Frequencies connectorStartupResultFrequencies = Frequencies.forBooleanValues(connectorFailurePct, connectorSuccessPct);
+        connectorStartupResults = metricGroup.sensor("connector-startup-results");
+        connectorStartupResults.add(connectorStartupResultFrequencies);
+
+        connectorStartupAttempts = metricGroup.sensor("connector-startup-attempts");
+        connectorStartupAttempts.add(metricGroup.metricName(registry.connectorStartupAttemptsTotal), new CumulativeSum());
+
+        connectorStartupSuccesses = metricGroup.sensor("connector-startup-successes");
+        connectorStartupSuccesses.add(metricGroup.metricName(registry.connectorStartupSuccessTotal), new CumulativeSum());
+
+        connectorStartupFailures = metricGroup.sensor("connector-startup-failures");
+        connectorStartupFailures.add(metricGroup.metricName(registry.connectorStartupFailureTotal), new CumulativeSum());
+
+        MetricName taskFailurePct = metricGroup.metricName(registry.taskStartupFailurePercentage);
+        MetricName taskSuccessPct = metricGroup.metricName(registry.taskStartupSuccessPercentage);
+        Frequencies taskStartupResultFrequencies = Frequencies.forBooleanValues(taskFailurePct, taskSuccessPct);
+        taskStartupResults = metricGroup.sensor("task-startup-results");
+        taskStartupResults.add(taskStartupResultFrequencies);
+
+        taskStartupAttempts = metricGroup.sensor("task-startup-attempts");
+        taskStartupAttempts.add(metricGroup.metricName(registry.taskStartupAttemptsTotal), new CumulativeSum());
+
+        taskStartupSuccesses = metricGroup.sensor("task-startup-successes");
+        taskStartupSuccesses.add(metricGroup.metricName(registry.taskStartupSuccessTotal), new CumulativeSum());
+
+        taskStartupFailures = metricGroup.sensor("task-startup-failures");
+        taskStartupFailures.add(metricGroup.metricName(registry.taskStartupFailureTotal), new CumulativeSum());
+    }
+
+    void close() {
+        metricGroup.close();
+    }
+
+    void recordConnectorStartupFailure() {
+        connectorStartupAttempts.record(1.0);
+        connectorStartupFailures.record(1.0);
+        connectorStartupResults.record(0.0);
+    }
+
+    void recordConnectorStartupSuccess() {
+        connectorStartupAttempts.record(1.0);
+        connectorStartupSuccesses.record(1.0);
+        connectorStartupResults.record(1.0);
+    }
+
+    void recordTaskFailure() {
+        taskStartupAttempts.record(1.0);
+        taskStartupFailures.record(1.0);
+        taskStartupResults.record(0.0);
+    }
+
+    void recordTaskSuccess() {
+        taskStartupAttempts.record(1.0);
+        taskStartupSuccesses.record(1.0);
+        taskStartupResults.record(1.0);
+    }
+
+    protected ConnectMetrics.MetricGroup metricGroup() {
+        return metricGroup;
+    }
+
+    ConnectorStatus.Listener wrapConnectorStatusListener(ConnectorStatus.Listener delegateListener) {
+        return new WorkerMetricsGroupConnectorStatusListener(delegateListener);
+    }
+
+    TaskStatus.Listener wrapTaskStatusListener(TaskStatus.Listener delegateListener) {
+        return new WorkerMetricsGroupTaskStatusListener(delegateListener);
+    }
+
+    class WorkerMetricsGroupConnectorStatusListener implements ConnectorStatus.Listener {
+        private final ConnectorStatus.Listener delegateListener;
+        private boolean startupSucceeded = false;
+
+        WorkerMetricsGroupConnectorStatusListener(ConnectorStatus.Listener delegateListener) {
+            this.delegateListener = delegateListener;
+        }
+
+        @Override
+        public void onShutdown(final String connector) {
+            delegateListener.onShutdown(connector);
+        }
+
+        @Override
+        public void onFailure(final String connector, final Throwable cause) {
+            if (!startupSucceeded) {
+                recordConnectorStartupFailure();
+            }
+            delegateListener.onFailure(connector, cause);
+        }
+
+        @Override
+        public void onPause(final String connector) {
+            delegateListener.onPause(connector);
+        }
+
+        @Override
+        public void onResume(final String connector) {
+            delegateListener.onResume(connector);
+        }
+
+        @Override
+        public void onStartup(final String connector) {
+            delegateListener.onStartup(connector);
+            startupSucceeded = true;
+            recordConnectorStartupSuccess();
+        }
+
+        @Override
+        public void onDeletion(final String connector) {
+            delegateListener.onDeletion(connector);
+        }
+    }
+
+    class WorkerMetricsGroupTaskStatusListener implements TaskStatus.Listener {

Review comment:
       It was a little close to TaskStatus.Listener for me originally, but yeah i agree the name is cumbersome. I'm happy to rename it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] michael-carter-instaclustr commented on pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
michael-carter-instaclustr commented on pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#issuecomment-697142928


   Hey @C0urante, I'm not sure how long it usually takes for a committer to get around to looking at things like this, maybe several months is normal, but just thought I'd check with you, do you have any tips on how to get a committers attention?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#discussion_r443742947



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -289,7 +285,6 @@ public void startConnector(
                 // Can't be put in a finally block because it needs to be swapped before the call on
                 // statusListener
                 Plugins.compareAndSwapLoaders(savedLoader);
-                workerMetricsGroup.recordConnectorStartupFailure();

Review comment:
       Oooh, yeah that's neat. 👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#issuecomment-661445423


   I am not really familiar with connect code base. Would leave it to @rhauch or @kkonstantine to have a look.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] michael-carter-instaclustr commented on a change in pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
michael-carter-instaclustr commented on a change in pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#discussion_r443386418



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerMetricsGroup.java
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+import java.util.Map;
+
+class WorkerMetricsGroup {
+    private final ConnectMetrics.MetricGroup metricGroup;
+    private final Sensor connectorStartupAttempts;
+    private final Sensor connectorStartupSuccesses;
+    private final Sensor connectorStartupFailures;
+    private final Sensor connectorStartupResults;
+    private final Sensor taskStartupAttempts;
+    private final Sensor taskStartupSuccesses;
+    private final Sensor taskStartupFailures;
+    private final Sensor taskStartupResults;
+
+    public WorkerMetricsGroup(final Map<String, WorkerConnector> connectors, Map<ConnectorTaskId, WorkerTask> tasks, ConnectMetrics connectMetrics) {
+        ConnectMetricsRegistry registry = connectMetrics.registry();
+        metricGroup = connectMetrics.group(registry.workerGroupName());
+
+        metricGroup.addValueMetric(registry.connectorCount, now -> (double) connectors.size());
+        metricGroup.addValueMetric(registry.taskCount, now -> (double) tasks.size());
+
+        MetricName connectorFailurePct = metricGroup.metricName(registry.connectorStartupFailurePercentage);
+        MetricName connectorSuccessPct = metricGroup.metricName(registry.connectorStartupSuccessPercentage);
+        Frequencies connectorStartupResultFrequencies = Frequencies.forBooleanValues(connectorFailurePct, connectorSuccessPct);
+        connectorStartupResults = metricGroup.sensor("connector-startup-results");
+        connectorStartupResults.add(connectorStartupResultFrequencies);
+
+        connectorStartupAttempts = metricGroup.sensor("connector-startup-attempts");
+        connectorStartupAttempts.add(metricGroup.metricName(registry.connectorStartupAttemptsTotal), new CumulativeSum());
+
+        connectorStartupSuccesses = metricGroup.sensor("connector-startup-successes");
+        connectorStartupSuccesses.add(metricGroup.metricName(registry.connectorStartupSuccessTotal), new CumulativeSum());
+
+        connectorStartupFailures = metricGroup.sensor("connector-startup-failures");
+        connectorStartupFailures.add(metricGroup.metricName(registry.connectorStartupFailureTotal), new CumulativeSum());
+
+        MetricName taskFailurePct = metricGroup.metricName(registry.taskStartupFailurePercentage);
+        MetricName taskSuccessPct = metricGroup.metricName(registry.taskStartupSuccessPercentage);
+        Frequencies taskStartupResultFrequencies = Frequencies.forBooleanValues(taskFailurePct, taskSuccessPct);
+        taskStartupResults = metricGroup.sensor("task-startup-results");
+        taskStartupResults.add(taskStartupResultFrequencies);
+
+        taskStartupAttempts = metricGroup.sensor("task-startup-attempts");
+        taskStartupAttempts.add(metricGroup.metricName(registry.taskStartupAttemptsTotal), new CumulativeSum());
+
+        taskStartupSuccesses = metricGroup.sensor("task-startup-successes");
+        taskStartupSuccesses.add(metricGroup.metricName(registry.taskStartupSuccessTotal), new CumulativeSum());
+
+        taskStartupFailures = metricGroup.sensor("task-startup-failures");
+        taskStartupFailures.add(metricGroup.metricName(registry.taskStartupFailureTotal), new CumulativeSum());
+    }
+
+    void close() {
+        metricGroup.close();
+    }
+
+    void recordConnectorStartupFailure() {
+        connectorStartupAttempts.record(1.0);
+        connectorStartupFailures.record(1.0);
+        connectorStartupResults.record(0.0);
+    }
+
+    void recordConnectorStartupSuccess() {
+        connectorStartupAttempts.record(1.0);
+        connectorStartupSuccesses.record(1.0);
+        connectorStartupResults.record(1.0);
+    }
+
+    void recordTaskFailure() {
+        taskStartupAttempts.record(1.0);
+        taskStartupFailures.record(1.0);
+        taskStartupResults.record(0.0);
+    }
+
+    void recordTaskSuccess() {
+        taskStartupAttempts.record(1.0);
+        taskStartupSuccesses.record(1.0);
+        taskStartupResults.record(1.0);
+    }
+
+    protected ConnectMetrics.MetricGroup metricGroup() {
+        return metricGroup;
+    }
+
+    ConnectorStatus.Listener wrapConnectorStatusListener(ConnectorStatus.Listener delegateListener) {
+        return new WorkerMetricsGroupConnectorStatusListener(delegateListener);
+    }
+
+    TaskStatus.Listener wrapTaskStatusListener(TaskStatus.Listener delegateListener) {
+        return new WorkerMetricsGroupTaskStatusListener(delegateListener);
+    }
+
+    class WorkerMetricsGroupConnectorStatusListener implements ConnectorStatus.Listener {

Review comment:
       It was a little close to ConnectorStatus.Listener for me originally, but yeah i agree the name is cumbersome. I'm happy to rename it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] michael-carter-instaclustr commented on pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
michael-carter-instaclustr commented on pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#issuecomment-649196880


   I've made changes to those test now @C0urante .  I couple of things worth noting: Changing the mock of the WorkMetricsGroup to a real object needs a fair few more mocks that relate to each other, so I've organised those into a @Before method instead of injecting them via a @Mock annotation, I hope that's okay. Doing so had the benefit that the tests are now more explicit about the values of the metrics being recorded, which may make the deletion of lines in WorkerTest more palatable. Having a look at the tests I'm modifying there, most of them actually already do have expectations on the statusListener being called, so I think it that case it's fair to say that the responsibility has simply moved to the WorkerGroupMetrics class (and therefore the WorkerGroupMetricsTest). For the test that doesn't seem to have any expectations on the status listener (testAddRemoveTask), I believe this might be because it's the WorkerTask's job to call the status listener once it's running, but that aspect is mocked away in that particular test.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] michael-carter-instaclustr commented on pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
michael-carter-instaclustr commented on pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#issuecomment-648002707


   OK, I've made the changes that we discussed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#issuecomment-883512927


   Thanks again, @michael-carter-instaclustr. And I apologize for the unacceptably long delay on my part with this PR.
   
   FYI: I merged from `trunk` to get the `onRestart(...)` method that KIP-745 (PR #10822) added recently (prior to the creation of the `3.0` branch). I then implemented these methods by simply calling the delegate, as with the other methods. Simple enough change, but necessary before merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] michael-carter-instaclustr commented on pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
michael-carter-instaclustr commented on pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#issuecomment-645688407


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] michael-carter-instaclustr commented on pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
michael-carter-instaclustr commented on pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#issuecomment-644443698


   Failing unit test kafka.admin.ReassignPartitionsUnitTest.testModifyBrokerThrottles is unrelated to this pull request and is tracked by: https://issues.apache.org/jira/browse/KAFKA-10155
   https://issues.apache.org/jira/browse/KAFKA-10147


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] michael-carter-instaclustr commented on pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
michael-carter-instaclustr commented on pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#issuecomment-647356784


   Thanks for reviewing this @C0urante.
   
   "if the framework successfully instantiates a connector and is able to call start on it, should that alone qualify as a "successful" startup, or does the call to start also have to go off without a hitch?" --- That’s an interesting point and not one that I'd considered. My fundamental assumption in approaching this was that the ‘connector-startup-failure-total’ metric, described in the KIP as ‘The total number of connector starts that failed’, was intended to be a numerical record of failures within the ‘start’ method of the connector (And likewise for the task based metrics). Or in other words, they represented the health of the worker in an integration sense. (e.g. Does the worker have the right connectivity to do its job? Are people submitting valid configurations or are the users of Connect not understanding how to use it?) This to me seems like a useful aggregate metric that relates to the use of the worker as a whole more than a record of any individual connector failure.
   The way I encountered this was someone on my team was attempting to run a CloudWatch connector, but hadn’t got the configuration quite right, so the ‘start’ method would throw an exception and the connector would enter the failed state every time. However, the worker recorded this as a successful startup (which we all felt to be a bit misleading). With my interpretation of the metric’s intention, this confused the investigation into what the problem was and upon finding the cause. seemed like a bug in the metric (and a big surprise).
   FWIW, I would say that it still seems like the most natural interpretation of the metric to me, but if this isn’t the intention, then I’d probably suggest renaming the metric to something that de-conflicts its meaning with the connector’s ‘start’ method. E.g connector-start-preparation-failure-total or something like that (And similarly for task start-ups).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] gwenshap commented on pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
gwenshap commented on pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#issuecomment-643911650


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch merged pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
rhauch merged pull request #8844:
URL: https://github.com/apache/kafka/pull/8844


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#discussion_r443300068



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerMetricsGroup.java
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+import java.util.Map;
+
+class WorkerMetricsGroup {
+    private final ConnectMetrics.MetricGroup metricGroup;
+    private final Sensor connectorStartupAttempts;
+    private final Sensor connectorStartupSuccesses;
+    private final Sensor connectorStartupFailures;
+    private final Sensor connectorStartupResults;
+    private final Sensor taskStartupAttempts;
+    private final Sensor taskStartupSuccesses;
+    private final Sensor taskStartupFailures;
+    private final Sensor taskStartupResults;
+
+    public WorkerMetricsGroup(final Map<String, WorkerConnector> connectors, Map<ConnectorTaskId, WorkerTask> tasks, ConnectMetrics connectMetrics) {
+        ConnectMetricsRegistry registry = connectMetrics.registry();
+        metricGroup = connectMetrics.group(registry.workerGroupName());
+
+        metricGroup.addValueMetric(registry.connectorCount, now -> (double) connectors.size());
+        metricGroup.addValueMetric(registry.taskCount, now -> (double) tasks.size());
+
+        MetricName connectorFailurePct = metricGroup.metricName(registry.connectorStartupFailurePercentage);
+        MetricName connectorSuccessPct = metricGroup.metricName(registry.connectorStartupSuccessPercentage);
+        Frequencies connectorStartupResultFrequencies = Frequencies.forBooleanValues(connectorFailurePct, connectorSuccessPct);
+        connectorStartupResults = metricGroup.sensor("connector-startup-results");
+        connectorStartupResults.add(connectorStartupResultFrequencies);
+
+        connectorStartupAttempts = metricGroup.sensor("connector-startup-attempts");
+        connectorStartupAttempts.add(metricGroup.metricName(registry.connectorStartupAttemptsTotal), new CumulativeSum());
+
+        connectorStartupSuccesses = metricGroup.sensor("connector-startup-successes");
+        connectorStartupSuccesses.add(metricGroup.metricName(registry.connectorStartupSuccessTotal), new CumulativeSum());
+
+        connectorStartupFailures = metricGroup.sensor("connector-startup-failures");
+        connectorStartupFailures.add(metricGroup.metricName(registry.connectorStartupFailureTotal), new CumulativeSum());
+
+        MetricName taskFailurePct = metricGroup.metricName(registry.taskStartupFailurePercentage);
+        MetricName taskSuccessPct = metricGroup.metricName(registry.taskStartupSuccessPercentage);
+        Frequencies taskStartupResultFrequencies = Frequencies.forBooleanValues(taskFailurePct, taskSuccessPct);
+        taskStartupResults = metricGroup.sensor("task-startup-results");
+        taskStartupResults.add(taskStartupResultFrequencies);
+
+        taskStartupAttempts = metricGroup.sensor("task-startup-attempts");
+        taskStartupAttempts.add(metricGroup.metricName(registry.taskStartupAttemptsTotal), new CumulativeSum());
+
+        taskStartupSuccesses = metricGroup.sensor("task-startup-successes");
+        taskStartupSuccesses.add(metricGroup.metricName(registry.taskStartupSuccessTotal), new CumulativeSum());
+
+        taskStartupFailures = metricGroup.sensor("task-startup-failures");
+        taskStartupFailures.add(metricGroup.metricName(registry.taskStartupFailureTotal), new CumulativeSum());
+    }
+
+    void close() {
+        metricGroup.close();
+    }
+
+    void recordConnectorStartupFailure() {
+        connectorStartupAttempts.record(1.0);
+        connectorStartupFailures.record(1.0);
+        connectorStartupResults.record(0.0);
+    }
+
+    void recordConnectorStartupSuccess() {
+        connectorStartupAttempts.record(1.0);
+        connectorStartupSuccesses.record(1.0);
+        connectorStartupResults.record(1.0);
+    }
+
+    void recordTaskFailure() {
+        taskStartupAttempts.record(1.0);
+        taskStartupFailures.record(1.0);
+        taskStartupResults.record(0.0);
+    }
+
+    void recordTaskSuccess() {
+        taskStartupAttempts.record(1.0);
+        taskStartupSuccesses.record(1.0);
+        taskStartupResults.record(1.0);
+    }
+
+    protected ConnectMetrics.MetricGroup metricGroup() {
+        return metricGroup;
+    }
+
+    ConnectorStatus.Listener wrapConnectorStatusListener(ConnectorStatus.Listener delegateListener) {
+        return new WorkerMetricsGroupConnectorStatusListener(delegateListener);
+    }
+
+    TaskStatus.Listener wrapTaskStatusListener(TaskStatus.Listener delegateListener) {
+        return new WorkerMetricsGroupTaskStatusListener(delegateListener);
+    }
+
+    class WorkerMetricsGroupConnectorStatusListener implements ConnectorStatus.Listener {

Review comment:
       Another nit (sorry!): given that this is already an inner class for the `WorkerMetricsGroup` class, the `WorkerMetricsGroup` prefix seems redundant. What do you think about just `ConnectorStatusListener`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] michael-carter-instaclustr commented on pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
michael-carter-instaclustr commented on pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#issuecomment-811661662


   Thanks for having a look at this @rhauch . I've updated with the suggested log messages, and reordered the methods and lines in the ConnectorStatusListener. I've also put back in most of the assertStartupStatistics. There were a couple that I couldn't leave in there without breaking the unit test. I believe this is because the recording of the statistic is now deferred to a piece of code submitted to the execution service, rather than something that is set directly by the startTask method. The only thing I could think of to keep them in there was to fake an executor service that would call things as soon as they were submitted, but since that isn't how it would work in reality, it didn't seem like the right thing to do. Neither did keeping them in there and just asserting that they were 0 the whole time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] michael-carter-instaclustr commented on a change in pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
michael-carter-instaclustr commented on a change in pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#discussion_r443385901



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerMetricsGroup.java
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+import java.util.Map;
+
+class WorkerMetricsGroup {
+    private final ConnectMetrics.MetricGroup metricGroup;
+    private final Sensor connectorStartupAttempts;
+    private final Sensor connectorStartupSuccesses;
+    private final Sensor connectorStartupFailures;
+    private final Sensor connectorStartupResults;
+    private final Sensor taskStartupAttempts;
+    private final Sensor taskStartupSuccesses;
+    private final Sensor taskStartupFailures;
+    private final Sensor taskStartupResults;
+
+    public WorkerMetricsGroup(final Map<String, WorkerConnector> connectors, Map<ConnectorTaskId, WorkerTask> tasks, ConnectMetrics connectMetrics) {
+        ConnectMetricsRegistry registry = connectMetrics.registry();
+        metricGroup = connectMetrics.group(registry.workerGroupName());
+
+        metricGroup.addValueMetric(registry.connectorCount, now -> (double) connectors.size());
+        metricGroup.addValueMetric(registry.taskCount, now -> (double) tasks.size());
+
+        MetricName connectorFailurePct = metricGroup.metricName(registry.connectorStartupFailurePercentage);
+        MetricName connectorSuccessPct = metricGroup.metricName(registry.connectorStartupSuccessPercentage);
+        Frequencies connectorStartupResultFrequencies = Frequencies.forBooleanValues(connectorFailurePct, connectorSuccessPct);
+        connectorStartupResults = metricGroup.sensor("connector-startup-results");
+        connectorStartupResults.add(connectorStartupResultFrequencies);
+
+        connectorStartupAttempts = metricGroup.sensor("connector-startup-attempts");
+        connectorStartupAttempts.add(metricGroup.metricName(registry.connectorStartupAttemptsTotal), new CumulativeSum());
+
+        connectorStartupSuccesses = metricGroup.sensor("connector-startup-successes");
+        connectorStartupSuccesses.add(metricGroup.metricName(registry.connectorStartupSuccessTotal), new CumulativeSum());
+
+        connectorStartupFailures = metricGroup.sensor("connector-startup-failures");
+        connectorStartupFailures.add(metricGroup.metricName(registry.connectorStartupFailureTotal), new CumulativeSum());
+
+        MetricName taskFailurePct = metricGroup.metricName(registry.taskStartupFailurePercentage);
+        MetricName taskSuccessPct = metricGroup.metricName(registry.taskStartupSuccessPercentage);
+        Frequencies taskStartupResultFrequencies = Frequencies.forBooleanValues(taskFailurePct, taskSuccessPct);
+        taskStartupResults = metricGroup.sensor("task-startup-results");
+        taskStartupResults.add(taskStartupResultFrequencies);
+
+        taskStartupAttempts = metricGroup.sensor("task-startup-attempts");
+        taskStartupAttempts.add(metricGroup.metricName(registry.taskStartupAttemptsTotal), new CumulativeSum());
+
+        taskStartupSuccesses = metricGroup.sensor("task-startup-successes");
+        taskStartupSuccesses.add(metricGroup.metricName(registry.taskStartupSuccessTotal), new CumulativeSum());
+
+        taskStartupFailures = metricGroup.sensor("task-startup-failures");
+        taskStartupFailures.add(metricGroup.metricName(registry.taskStartupFailureTotal), new CumulativeSum());
+    }
+
+    void close() {
+        metricGroup.close();
+    }
+
+    void recordConnectorStartupFailure() {
+        connectorStartupAttempts.record(1.0);
+        connectorStartupFailures.record(1.0);
+        connectorStartupResults.record(0.0);
+    }
+
+    void recordConnectorStartupSuccess() {
+        connectorStartupAttempts.record(1.0);
+        connectorStartupSuccesses.record(1.0);
+        connectorStartupResults.record(1.0);
+    }
+
+    void recordTaskFailure() {
+        taskStartupAttempts.record(1.0);
+        taskStartupFailures.record(1.0);
+        taskStartupResults.record(0.0);
+    }
+
+    void recordTaskSuccess() {
+        taskStartupAttempts.record(1.0);
+        taskStartupSuccesses.record(1.0);
+        taskStartupResults.record(1.0);
+    }
+
+    protected ConnectMetrics.MetricGroup metricGroup() {
+        return metricGroup;
+    }
+
+    ConnectorStatus.Listener wrapConnectorStatusListener(ConnectorStatus.Listener delegateListener) {

Review comment:
       Yep, that sounds fair. I'll rename it (pending the discussion on direction overall)

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerMetricsGroup.java
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+import java.util.Map;
+
+class WorkerMetricsGroup {
+    private final ConnectMetrics.MetricGroup metricGroup;
+    private final Sensor connectorStartupAttempts;
+    private final Sensor connectorStartupSuccesses;
+    private final Sensor connectorStartupFailures;
+    private final Sensor connectorStartupResults;
+    private final Sensor taskStartupAttempts;
+    private final Sensor taskStartupSuccesses;
+    private final Sensor taskStartupFailures;
+    private final Sensor taskStartupResults;
+
+    public WorkerMetricsGroup(final Map<String, WorkerConnector> connectors, Map<ConnectorTaskId, WorkerTask> tasks, ConnectMetrics connectMetrics) {
+        ConnectMetricsRegistry registry = connectMetrics.registry();
+        metricGroup = connectMetrics.group(registry.workerGroupName());
+
+        metricGroup.addValueMetric(registry.connectorCount, now -> (double) connectors.size());
+        metricGroup.addValueMetric(registry.taskCount, now -> (double) tasks.size());
+
+        MetricName connectorFailurePct = metricGroup.metricName(registry.connectorStartupFailurePercentage);
+        MetricName connectorSuccessPct = metricGroup.metricName(registry.connectorStartupSuccessPercentage);
+        Frequencies connectorStartupResultFrequencies = Frequencies.forBooleanValues(connectorFailurePct, connectorSuccessPct);
+        connectorStartupResults = metricGroup.sensor("connector-startup-results");
+        connectorStartupResults.add(connectorStartupResultFrequencies);
+
+        connectorStartupAttempts = metricGroup.sensor("connector-startup-attempts");
+        connectorStartupAttempts.add(metricGroup.metricName(registry.connectorStartupAttemptsTotal), new CumulativeSum());
+
+        connectorStartupSuccesses = metricGroup.sensor("connector-startup-successes");
+        connectorStartupSuccesses.add(metricGroup.metricName(registry.connectorStartupSuccessTotal), new CumulativeSum());
+
+        connectorStartupFailures = metricGroup.sensor("connector-startup-failures");
+        connectorStartupFailures.add(metricGroup.metricName(registry.connectorStartupFailureTotal), new CumulativeSum());
+
+        MetricName taskFailurePct = metricGroup.metricName(registry.taskStartupFailurePercentage);
+        MetricName taskSuccessPct = metricGroup.metricName(registry.taskStartupSuccessPercentage);
+        Frequencies taskStartupResultFrequencies = Frequencies.forBooleanValues(taskFailurePct, taskSuccessPct);
+        taskStartupResults = metricGroup.sensor("task-startup-results");
+        taskStartupResults.add(taskStartupResultFrequencies);
+
+        taskStartupAttempts = metricGroup.sensor("task-startup-attempts");
+        taskStartupAttempts.add(metricGroup.metricName(registry.taskStartupAttemptsTotal), new CumulativeSum());
+
+        taskStartupSuccesses = metricGroup.sensor("task-startup-successes");
+        taskStartupSuccesses.add(metricGroup.metricName(registry.taskStartupSuccessTotal), new CumulativeSum());
+
+        taskStartupFailures = metricGroup.sensor("task-startup-failures");
+        taskStartupFailures.add(metricGroup.metricName(registry.taskStartupFailureTotal), new CumulativeSum());
+    }
+
+    void close() {
+        metricGroup.close();
+    }
+
+    void recordConnectorStartupFailure() {
+        connectorStartupAttempts.record(1.0);
+        connectorStartupFailures.record(1.0);
+        connectorStartupResults.record(0.0);
+    }
+
+    void recordConnectorStartupSuccess() {
+        connectorStartupAttempts.record(1.0);
+        connectorStartupSuccesses.record(1.0);
+        connectorStartupResults.record(1.0);
+    }
+
+    void recordTaskFailure() {
+        taskStartupAttempts.record(1.0);
+        taskStartupFailures.record(1.0);
+        taskStartupResults.record(0.0);
+    }
+
+    void recordTaskSuccess() {
+        taskStartupAttempts.record(1.0);
+        taskStartupSuccesses.record(1.0);
+        taskStartupResults.record(1.0);
+    }
+
+    protected ConnectMetrics.MetricGroup metricGroup() {
+        return metricGroup;
+    }
+
+    ConnectorStatus.Listener wrapConnectorStatusListener(ConnectorStatus.Listener delegateListener) {
+        return new WorkerMetricsGroupConnectorStatusListener(delegateListener);
+    }
+
+    TaskStatus.Listener wrapTaskStatusListener(TaskStatus.Listener delegateListener) {

Review comment:
       Yep, that sounds fair. I'll rename it (pending the discussion on direction overall)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
C0urante commented on pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#issuecomment-697324142


   Hi @michael-carter-instaclustr--unfortunately, it can take some time. Usually I tag @rhauch and @kkonstantine on GitHub since they're the committers that work most closely with the Connect framework, but I see you've already done that. You could try reaching out to the dev mailing list and asking for review there? This is a good bug fix and deserves a look; ideally, it shouldn't take months to get something like this reviewed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] michael-carter-instaclustr commented on a change in pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
michael-carter-instaclustr commented on a change in pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#discussion_r444716410



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerMetricsGroupTest.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.easymock.EasyMock.eq;
+import static org.powermock.api.easymock.PowerMock.createStrictMock;
+import static org.powermock.api.easymock.PowerMock.expectLastCall;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Sensor.class})
+public class WorkerMetricsGroupTest {
+    private final String connector = "org.FakeConnector";
+    private final ConnectorTaskId task = new ConnectorTaskId(connector, 0);
+    private final RuntimeException exception = new RuntimeException();
+
+    @Test
+    public void testConnectorStartupRecordedMetrics() {
+        final WorkerMetricsGroup mockWorkerMetricsGroup = createStrictMock(WorkerMetricsGroup.class);
+        final ConnectorStatus.Listener delegate = createStrictMock(ConnectorStatus.Listener.class);

Review comment:
       Yeah, I think it should be fine to do it with annotated class level variables




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] michael-carter-instaclustr commented on pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
michael-carter-instaclustr commented on pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#issuecomment-647205060


   Is this something you could review @C0urante ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] michael-carter-instaclustr commented on a change in pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
michael-carter-instaclustr commented on a change in pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#discussion_r444716683



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerMetricsGroupTest.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.easymock.EasyMock.eq;
+import static org.powermock.api.easymock.PowerMock.createStrictMock;
+import static org.powermock.api.easymock.PowerMock.expectLastCall;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Sensor.class})
+public class WorkerMetricsGroupTest {
+    private final String connector = "org.FakeConnector";
+    private final ConnectorTaskId task = new ConnectorTaskId(connector, 0);
+    private final RuntimeException exception = new RuntimeException();
+
+    @Test
+    public void testConnectorStartupRecordedMetrics() {
+        final WorkerMetricsGroup mockWorkerMetricsGroup = createStrictMock(WorkerMetricsGroup.class);
+        final ConnectorStatus.Listener delegate = createStrictMock(ConnectorStatus.Listener.class);
+        final WorkerMetricsGroup.ConnectorStatusListener connectorListener = mockWorkerMetricsGroup.new ConnectorStatusListener(delegate);
+
+        delegate.onStartup(connector);
+        expectLastCall();
+
+        mockWorkerMetricsGroup.recordConnectorStartupSuccess();
+        expectLastCall();
+
+        PowerMock.replayAll();
+
+        connectorListener.onStartup(connector);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testConnectorFailureAfterStartupRecordedMetrics() {
+        final WorkerMetricsGroup mockWorkerMetricsGroup = createStrictMock(WorkerMetricsGroup.class);
+        final ConnectorStatus.Listener delegate = createStrictMock(ConnectorStatus.Listener.class);
+        final WorkerMetricsGroup.ConnectorStatusListener connectorListener = mockWorkerMetricsGroup.new ConnectorStatusListener(delegate);
+
+        delegate.onStartup(eq(connector));
+        expectLastCall();
+
+        delegate.onFailure(eq(connector), eq(exception));
+        expectLastCall();
+
+        mockWorkerMetricsGroup.recordConnectorStartupSuccess();

Review comment:
       No argument from me. I'll change it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] michael-carter-instaclustr commented on a change in pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
michael-carter-instaclustr commented on a change in pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#discussion_r443385639



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -562,7 +556,6 @@ public boolean startTask(
                 // statusListener
                 Plugins.compareAndSwapLoaders(savedLoader);
                 connectorStatusMetricsGroup.recordTaskRemoved(id);
-                workerMetricsGroup.recordTaskFailure();

Review comment:
       Good point. I believe we could achieve the same effect by wrapping the statusListener earlier in the method and using the wrapped listener in the exception handler. That way the only way we could fail without recording a metric would be if the wrapping itself failed (which is a dead simple constructor, so seems unlikely)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#discussion_r443299808



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerMetricsGroup.java
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+import java.util.Map;
+
+class WorkerMetricsGroup {
+    private final ConnectMetrics.MetricGroup metricGroup;
+    private final Sensor connectorStartupAttempts;
+    private final Sensor connectorStartupSuccesses;
+    private final Sensor connectorStartupFailures;
+    private final Sensor connectorStartupResults;
+    private final Sensor taskStartupAttempts;
+    private final Sensor taskStartupSuccesses;
+    private final Sensor taskStartupFailures;
+    private final Sensor taskStartupResults;
+
+    public WorkerMetricsGroup(final Map<String, WorkerConnector> connectors, Map<ConnectorTaskId, WorkerTask> tasks, ConnectMetrics connectMetrics) {
+        ConnectMetricsRegistry registry = connectMetrics.registry();
+        metricGroup = connectMetrics.group(registry.workerGroupName());
+
+        metricGroup.addValueMetric(registry.connectorCount, now -> (double) connectors.size());
+        metricGroup.addValueMetric(registry.taskCount, now -> (double) tasks.size());
+
+        MetricName connectorFailurePct = metricGroup.metricName(registry.connectorStartupFailurePercentage);
+        MetricName connectorSuccessPct = metricGroup.metricName(registry.connectorStartupSuccessPercentage);
+        Frequencies connectorStartupResultFrequencies = Frequencies.forBooleanValues(connectorFailurePct, connectorSuccessPct);
+        connectorStartupResults = metricGroup.sensor("connector-startup-results");
+        connectorStartupResults.add(connectorStartupResultFrequencies);
+
+        connectorStartupAttempts = metricGroup.sensor("connector-startup-attempts");
+        connectorStartupAttempts.add(metricGroup.metricName(registry.connectorStartupAttemptsTotal), new CumulativeSum());
+
+        connectorStartupSuccesses = metricGroup.sensor("connector-startup-successes");
+        connectorStartupSuccesses.add(metricGroup.metricName(registry.connectorStartupSuccessTotal), new CumulativeSum());
+
+        connectorStartupFailures = metricGroup.sensor("connector-startup-failures");
+        connectorStartupFailures.add(metricGroup.metricName(registry.connectorStartupFailureTotal), new CumulativeSum());
+
+        MetricName taskFailurePct = metricGroup.metricName(registry.taskStartupFailurePercentage);
+        MetricName taskSuccessPct = metricGroup.metricName(registry.taskStartupSuccessPercentage);
+        Frequencies taskStartupResultFrequencies = Frequencies.forBooleanValues(taskFailurePct, taskSuccessPct);
+        taskStartupResults = metricGroup.sensor("task-startup-results");
+        taskStartupResults.add(taskStartupResultFrequencies);
+
+        taskStartupAttempts = metricGroup.sensor("task-startup-attempts");
+        taskStartupAttempts.add(metricGroup.metricName(registry.taskStartupAttemptsTotal), new CumulativeSum());
+
+        taskStartupSuccesses = metricGroup.sensor("task-startup-successes");
+        taskStartupSuccesses.add(metricGroup.metricName(registry.taskStartupSuccessTotal), new CumulativeSum());
+
+        taskStartupFailures = metricGroup.sensor("task-startup-failures");
+        taskStartupFailures.add(metricGroup.metricName(registry.taskStartupFailureTotal), new CumulativeSum());
+    }
+
+    void close() {
+        metricGroup.close();
+    }
+
+    void recordConnectorStartupFailure() {
+        connectorStartupAttempts.record(1.0);
+        connectorStartupFailures.record(1.0);
+        connectorStartupResults.record(0.0);
+    }
+
+    void recordConnectorStartupSuccess() {
+        connectorStartupAttempts.record(1.0);
+        connectorStartupSuccesses.record(1.0);
+        connectorStartupResults.record(1.0);
+    }
+
+    void recordTaskFailure() {
+        taskStartupAttempts.record(1.0);
+        taskStartupFailures.record(1.0);
+        taskStartupResults.record(0.0);
+    }
+
+    void recordTaskSuccess() {
+        taskStartupAttempts.record(1.0);
+        taskStartupSuccesses.record(1.0);
+        taskStartupResults.record(1.0);
+    }
+
+    protected ConnectMetrics.MetricGroup metricGroup() {
+        return metricGroup;
+    }
+
+    ConnectorStatus.Listener wrapConnectorStatusListener(ConnectorStatus.Listener delegateListener) {

Review comment:
       Nit: the name here is a bit verbose. The type signature of the parameter here already tells us that this is for a connector status listener; do you think `wrapStatusListener` or even just `statusListener` might convey the necessary information?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerMetricsGroup.java
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+import java.util.Map;
+
+class WorkerMetricsGroup {
+    private final ConnectMetrics.MetricGroup metricGroup;
+    private final Sensor connectorStartupAttempts;
+    private final Sensor connectorStartupSuccesses;
+    private final Sensor connectorStartupFailures;
+    private final Sensor connectorStartupResults;
+    private final Sensor taskStartupAttempts;
+    private final Sensor taskStartupSuccesses;
+    private final Sensor taskStartupFailures;
+    private final Sensor taskStartupResults;
+
+    public WorkerMetricsGroup(final Map<String, WorkerConnector> connectors, Map<ConnectorTaskId, WorkerTask> tasks, ConnectMetrics connectMetrics) {
+        ConnectMetricsRegistry registry = connectMetrics.registry();
+        metricGroup = connectMetrics.group(registry.workerGroupName());
+
+        metricGroup.addValueMetric(registry.connectorCount, now -> (double) connectors.size());
+        metricGroup.addValueMetric(registry.taskCount, now -> (double) tasks.size());
+
+        MetricName connectorFailurePct = metricGroup.metricName(registry.connectorStartupFailurePercentage);
+        MetricName connectorSuccessPct = metricGroup.metricName(registry.connectorStartupSuccessPercentage);
+        Frequencies connectorStartupResultFrequencies = Frequencies.forBooleanValues(connectorFailurePct, connectorSuccessPct);
+        connectorStartupResults = metricGroup.sensor("connector-startup-results");
+        connectorStartupResults.add(connectorStartupResultFrequencies);
+
+        connectorStartupAttempts = metricGroup.sensor("connector-startup-attempts");
+        connectorStartupAttempts.add(metricGroup.metricName(registry.connectorStartupAttemptsTotal), new CumulativeSum());
+
+        connectorStartupSuccesses = metricGroup.sensor("connector-startup-successes");
+        connectorStartupSuccesses.add(metricGroup.metricName(registry.connectorStartupSuccessTotal), new CumulativeSum());
+
+        connectorStartupFailures = metricGroup.sensor("connector-startup-failures");
+        connectorStartupFailures.add(metricGroup.metricName(registry.connectorStartupFailureTotal), new CumulativeSum());
+
+        MetricName taskFailurePct = metricGroup.metricName(registry.taskStartupFailurePercentage);
+        MetricName taskSuccessPct = metricGroup.metricName(registry.taskStartupSuccessPercentage);
+        Frequencies taskStartupResultFrequencies = Frequencies.forBooleanValues(taskFailurePct, taskSuccessPct);
+        taskStartupResults = metricGroup.sensor("task-startup-results");
+        taskStartupResults.add(taskStartupResultFrequencies);
+
+        taskStartupAttempts = metricGroup.sensor("task-startup-attempts");
+        taskStartupAttempts.add(metricGroup.metricName(registry.taskStartupAttemptsTotal), new CumulativeSum());
+
+        taskStartupSuccesses = metricGroup.sensor("task-startup-successes");
+        taskStartupSuccesses.add(metricGroup.metricName(registry.taskStartupSuccessTotal), new CumulativeSum());
+
+        taskStartupFailures = metricGroup.sensor("task-startup-failures");
+        taskStartupFailures.add(metricGroup.metricName(registry.taskStartupFailureTotal), new CumulativeSum());
+    }
+
+    void close() {
+        metricGroup.close();
+    }
+
+    void recordConnectorStartupFailure() {
+        connectorStartupAttempts.record(1.0);
+        connectorStartupFailures.record(1.0);
+        connectorStartupResults.record(0.0);
+    }
+
+    void recordConnectorStartupSuccess() {
+        connectorStartupAttempts.record(1.0);
+        connectorStartupSuccesses.record(1.0);
+        connectorStartupResults.record(1.0);
+    }
+
+    void recordTaskFailure() {
+        taskStartupAttempts.record(1.0);
+        taskStartupFailures.record(1.0);
+        taskStartupResults.record(0.0);
+    }
+
+    void recordTaskSuccess() {
+        taskStartupAttempts.record(1.0);
+        taskStartupSuccesses.record(1.0);
+        taskStartupResults.record(1.0);
+    }
+
+    protected ConnectMetrics.MetricGroup metricGroup() {
+        return metricGroup;
+    }
+
+    ConnectorStatus.Listener wrapConnectorStatusListener(ConnectorStatus.Listener delegateListener) {
+        return new WorkerMetricsGroupConnectorStatusListener(delegateListener);
+    }
+
+    TaskStatus.Listener wrapTaskStatusListener(TaskStatus.Listener delegateListener) {
+        return new WorkerMetricsGroupTaskStatusListener(delegateListener);
+    }
+
+    class WorkerMetricsGroupConnectorStatusListener implements ConnectorStatus.Listener {
+        private final ConnectorStatus.Listener delegateListener;
+        private boolean startupSucceeded = false;
+
+        WorkerMetricsGroupConnectorStatusListener(ConnectorStatus.Listener delegateListener) {
+            this.delegateListener = delegateListener;
+        }
+
+        @Override
+        public void onShutdown(final String connector) {
+            delegateListener.onShutdown(connector);
+        }
+
+        @Override
+        public void onFailure(final String connector, final Throwable cause) {
+            if (!startupSucceeded) {
+                recordConnectorStartupFailure();
+            }
+            delegateListener.onFailure(connector, cause);
+        }
+
+        @Override
+        public void onPause(final String connector) {
+            delegateListener.onPause(connector);
+        }
+
+        @Override
+        public void onResume(final String connector) {
+            delegateListener.onResume(connector);
+        }
+
+        @Override
+        public void onStartup(final String connector) {
+            delegateListener.onStartup(connector);
+            startupSucceeded = true;
+            recordConnectorStartupSuccess();
+        }
+
+        @Override
+        public void onDeletion(final String connector) {
+            delegateListener.onDeletion(connector);
+        }
+    }
+
+    class WorkerMetricsGroupTaskStatusListener implements TaskStatus.Listener {

Review comment:
       Same (nitty) comment here: maybe just `TaskStatusListener`?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerMetricsGroup.java
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+import java.util.Map;
+
+class WorkerMetricsGroup {
+    private final ConnectMetrics.MetricGroup metricGroup;
+    private final Sensor connectorStartupAttempts;
+    private final Sensor connectorStartupSuccesses;
+    private final Sensor connectorStartupFailures;
+    private final Sensor connectorStartupResults;
+    private final Sensor taskStartupAttempts;
+    private final Sensor taskStartupSuccesses;
+    private final Sensor taskStartupFailures;
+    private final Sensor taskStartupResults;
+
+    public WorkerMetricsGroup(final Map<String, WorkerConnector> connectors, Map<ConnectorTaskId, WorkerTask> tasks, ConnectMetrics connectMetrics) {
+        ConnectMetricsRegistry registry = connectMetrics.registry();
+        metricGroup = connectMetrics.group(registry.workerGroupName());
+
+        metricGroup.addValueMetric(registry.connectorCount, now -> (double) connectors.size());
+        metricGroup.addValueMetric(registry.taskCount, now -> (double) tasks.size());
+
+        MetricName connectorFailurePct = metricGroup.metricName(registry.connectorStartupFailurePercentage);
+        MetricName connectorSuccessPct = metricGroup.metricName(registry.connectorStartupSuccessPercentage);
+        Frequencies connectorStartupResultFrequencies = Frequencies.forBooleanValues(connectorFailurePct, connectorSuccessPct);
+        connectorStartupResults = metricGroup.sensor("connector-startup-results");
+        connectorStartupResults.add(connectorStartupResultFrequencies);
+
+        connectorStartupAttempts = metricGroup.sensor("connector-startup-attempts");
+        connectorStartupAttempts.add(metricGroup.metricName(registry.connectorStartupAttemptsTotal), new CumulativeSum());
+
+        connectorStartupSuccesses = metricGroup.sensor("connector-startup-successes");
+        connectorStartupSuccesses.add(metricGroup.metricName(registry.connectorStartupSuccessTotal), new CumulativeSum());
+
+        connectorStartupFailures = metricGroup.sensor("connector-startup-failures");
+        connectorStartupFailures.add(metricGroup.metricName(registry.connectorStartupFailureTotal), new CumulativeSum());
+
+        MetricName taskFailurePct = metricGroup.metricName(registry.taskStartupFailurePercentage);
+        MetricName taskSuccessPct = metricGroup.metricName(registry.taskStartupSuccessPercentage);
+        Frequencies taskStartupResultFrequencies = Frequencies.forBooleanValues(taskFailurePct, taskSuccessPct);
+        taskStartupResults = metricGroup.sensor("task-startup-results");
+        taskStartupResults.add(taskStartupResultFrequencies);
+
+        taskStartupAttempts = metricGroup.sensor("task-startup-attempts");
+        taskStartupAttempts.add(metricGroup.metricName(registry.taskStartupAttemptsTotal), new CumulativeSum());
+
+        taskStartupSuccesses = metricGroup.sensor("task-startup-successes");
+        taskStartupSuccesses.add(metricGroup.metricName(registry.taskStartupSuccessTotal), new CumulativeSum());
+
+        taskStartupFailures = metricGroup.sensor("task-startup-failures");
+        taskStartupFailures.add(metricGroup.metricName(registry.taskStartupFailureTotal), new CumulativeSum());
+    }
+
+    void close() {
+        metricGroup.close();
+    }
+
+    void recordConnectorStartupFailure() {
+        connectorStartupAttempts.record(1.0);
+        connectorStartupFailures.record(1.0);
+        connectorStartupResults.record(0.0);
+    }
+
+    void recordConnectorStartupSuccess() {
+        connectorStartupAttempts.record(1.0);
+        connectorStartupSuccesses.record(1.0);
+        connectorStartupResults.record(1.0);
+    }
+
+    void recordTaskFailure() {
+        taskStartupAttempts.record(1.0);
+        taskStartupFailures.record(1.0);
+        taskStartupResults.record(0.0);
+    }
+
+    void recordTaskSuccess() {
+        taskStartupAttempts.record(1.0);
+        taskStartupSuccesses.record(1.0);
+        taskStartupResults.record(1.0);
+    }
+
+    protected ConnectMetrics.MetricGroup metricGroup() {
+        return metricGroup;
+    }
+
+    ConnectorStatus.Listener wrapConnectorStatusListener(ConnectorStatus.Listener delegateListener) {
+        return new WorkerMetricsGroupConnectorStatusListener(delegateListener);
+    }
+
+    TaskStatus.Listener wrapTaskStatusListener(TaskStatus.Listener delegateListener) {
+        return new WorkerMetricsGroupTaskStatusListener(delegateListener);
+    }
+
+    class WorkerMetricsGroupConnectorStatusListener implements ConnectorStatus.Listener {

Review comment:
       Another nit (sorry!): given that this is already an inner class for the `WorkerMetricsGroup` class, the `WorkerMetricsGroup` prefix seems redundant. What do you think about just `ConnectoStatusListener`?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerMetricsGroup.java
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+import java.util.Map;
+
+class WorkerMetricsGroup {
+    private final ConnectMetrics.MetricGroup metricGroup;
+    private final Sensor connectorStartupAttempts;
+    private final Sensor connectorStartupSuccesses;
+    private final Sensor connectorStartupFailures;
+    private final Sensor connectorStartupResults;
+    private final Sensor taskStartupAttempts;
+    private final Sensor taskStartupSuccesses;
+    private final Sensor taskStartupFailures;
+    private final Sensor taskStartupResults;
+
+    public WorkerMetricsGroup(final Map<String, WorkerConnector> connectors, Map<ConnectorTaskId, WorkerTask> tasks, ConnectMetrics connectMetrics) {
+        ConnectMetricsRegistry registry = connectMetrics.registry();
+        metricGroup = connectMetrics.group(registry.workerGroupName());
+
+        metricGroup.addValueMetric(registry.connectorCount, now -> (double) connectors.size());
+        metricGroup.addValueMetric(registry.taskCount, now -> (double) tasks.size());
+
+        MetricName connectorFailurePct = metricGroup.metricName(registry.connectorStartupFailurePercentage);
+        MetricName connectorSuccessPct = metricGroup.metricName(registry.connectorStartupSuccessPercentage);
+        Frequencies connectorStartupResultFrequencies = Frequencies.forBooleanValues(connectorFailurePct, connectorSuccessPct);
+        connectorStartupResults = metricGroup.sensor("connector-startup-results");
+        connectorStartupResults.add(connectorStartupResultFrequencies);
+
+        connectorStartupAttempts = metricGroup.sensor("connector-startup-attempts");
+        connectorStartupAttempts.add(metricGroup.metricName(registry.connectorStartupAttemptsTotal), new CumulativeSum());
+
+        connectorStartupSuccesses = metricGroup.sensor("connector-startup-successes");
+        connectorStartupSuccesses.add(metricGroup.metricName(registry.connectorStartupSuccessTotal), new CumulativeSum());
+
+        connectorStartupFailures = metricGroup.sensor("connector-startup-failures");
+        connectorStartupFailures.add(metricGroup.metricName(registry.connectorStartupFailureTotal), new CumulativeSum());
+
+        MetricName taskFailurePct = metricGroup.metricName(registry.taskStartupFailurePercentage);
+        MetricName taskSuccessPct = metricGroup.metricName(registry.taskStartupSuccessPercentage);
+        Frequencies taskStartupResultFrequencies = Frequencies.forBooleanValues(taskFailurePct, taskSuccessPct);
+        taskStartupResults = metricGroup.sensor("task-startup-results");
+        taskStartupResults.add(taskStartupResultFrequencies);
+
+        taskStartupAttempts = metricGroup.sensor("task-startup-attempts");
+        taskStartupAttempts.add(metricGroup.metricName(registry.taskStartupAttemptsTotal), new CumulativeSum());
+
+        taskStartupSuccesses = metricGroup.sensor("task-startup-successes");
+        taskStartupSuccesses.add(metricGroup.metricName(registry.taskStartupSuccessTotal), new CumulativeSum());
+
+        taskStartupFailures = metricGroup.sensor("task-startup-failures");
+        taskStartupFailures.add(metricGroup.metricName(registry.taskStartupFailureTotal), new CumulativeSum());
+    }
+
+    void close() {
+        metricGroup.close();
+    }
+
+    void recordConnectorStartupFailure() {
+        connectorStartupAttempts.record(1.0);
+        connectorStartupFailures.record(1.0);
+        connectorStartupResults.record(0.0);
+    }
+
+    void recordConnectorStartupSuccess() {
+        connectorStartupAttempts.record(1.0);
+        connectorStartupSuccesses.record(1.0);
+        connectorStartupResults.record(1.0);
+    }
+
+    void recordTaskFailure() {
+        taskStartupAttempts.record(1.0);
+        taskStartupFailures.record(1.0);
+        taskStartupResults.record(0.0);
+    }
+
+    void recordTaskSuccess() {
+        taskStartupAttempts.record(1.0);
+        taskStartupSuccesses.record(1.0);
+        taskStartupResults.record(1.0);
+    }
+
+    protected ConnectMetrics.MetricGroup metricGroup() {
+        return metricGroup;
+    }
+
+    ConnectorStatus.Listener wrapConnectorStatusListener(ConnectorStatus.Listener delegateListener) {
+        return new WorkerMetricsGroupConnectorStatusListener(delegateListener);
+    }
+
+    TaskStatus.Listener wrapTaskStatusListener(TaskStatus.Listener delegateListener) {

Review comment:
       Same comment here w/r/t naming; I think `wrapStatusListener` or `statusListener` may be warranted.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -289,7 +285,6 @@ public void startConnector(
                 // Can't be put in a finally block because it needs to be swapped before the call on
                 // statusListener
                 Plugins.compareAndSwapLoaders(savedLoader);
-                workerMetricsGroup.recordConnectorStartupFailure();

Review comment:
       I think we might want to keep this line here in case we fail somehow before even creating the `WorkerConnector` instance.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
##########
@@ -56,7 +56,7 @@
     private static final String THREAD_NAME_PREFIX = "task-thread-";
 
     protected final ConnectorTaskId id;
-    private final TaskStatus.Listener statusListener;
+    protected final TaskStatus.Listener statusListener;

Review comment:
       It seems a little unclean to start calling the `statusListener` from the subclass when it's been used exclusively by the `WorkerTask` abstract class up to this point. Not the end of the world but I think we might be able to do this more cleanly by decomposing the existing `execute` method into separate `initializeAndStart` (name obviously subject to change) and `execute` methods, with the call to `statusListener::onStartup` in between them. This way, we can avoid worrying about the status listener in the subclasses and can  encapsulate some shared logic in the abstract superclass. What are your thoughts?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -562,7 +556,6 @@ public boolean startTask(
                 // statusListener
                 Plugins.compareAndSwapLoaders(savedLoader);
                 connectorStatusMetricsGroup.recordTaskRemoved(id);
-                workerMetricsGroup.recordTaskFailure();

Review comment:
       I think we might want to keep this line here in case we fail somehow before even creating the `WorkerTask` instance. This can happen if a `Converter`, `Transformation`, etc. throws an exception during startup.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#discussion_r443756424



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
##########
@@ -56,7 +56,7 @@
     private static final String THREAD_NAME_PREFIX = "task-thread-";
 
     protected final ConnectorTaskId id;
-    private final TaskStatus.Listener statusListener;
+    protected final TaskStatus.Listener statusListener;

Review comment:
       Yeah, we definitely don't want to run connector or task code on the same thread as the `Worker` (or really, the `Herder` that's calling the `Worker`). I don't think it's that bad if the startup and recording of metrics happens asynchronously from the call to `Worker::startConnector` or `Worker::startTask`; the only potential downside I can think of is that someone might believe they've started a connector/task but the startup metrics for the worker might not yet be incremented if the connector/task is taking a while (or just completely hung) during startup.
   
   I was just suggesting a small refactoring though, not anything that would affect the actual behavior of the framework. Just something to make the code a little cleaner.
   
   In case it helps, I was thinking `WorkerTask` might look like this:
   
   ```java
   abstract class WorkerTask {
       private final TaskStatus.Listener statusListener;
   
       protected abstract void initializeAndStart();
   
       protected abstract void execute();
   
           private void doRun() throws InterruptedException {
           try {
               synchronized (this) {
                   if (stopping)
                       return;
   
                   if (targetState == TargetState.PAUSED) {
                       onPause();
                       if (!awaitUnpause()) return;
                   }
               }
   
               // These three lines replace the single call to execute() that's in the WorkerTask class right now
               initializeAndStart();
               statusListener.onStartup();
               execute();
           } catch (Throwable t) {
               log.error("{} Task threw an uncaught and unrecoverable exception", this, t);
               log.error("{} Task is being killed and will not recover until manually restarted", this);
               throw t;
           } finally {
               doClose();
           }
       }
   }
   
   ```
   
   `WorkerSinkTask` might look like this:
   
   ```java
   class WorkerSinkTask extends WorkerTask {
   
       // This is already a method in the WorkerSinkTask class, but now it overrides an abstract method in the WorkerTask superclass
       @Override
       protected void initializeAndStart() {
           SinkConnectorConfig.validate(taskConfig);
   
           if (SinkConnectorConfig.hasTopicsConfig(taskConfig)) {
               List<String> topics = SinkConnectorConfig.parseTopicsList(taskConfig);
               consumer.subscribe(topics, new HandleRebalance());
               log.debug("{} Initializing and starting task for topics {}", this, Utils.join(topics, ", "));
           } else {
               String topicsRegexStr = taskConfig.get(SinkTask.TOPICS_REGEX_CONFIG);
               Pattern pattern = Pattern.compile(topicsRegexStr);
               consumer.subscribe(pattern, new HandleRebalance());
               log.debug("{} Initializing and starting task for topics regex {}", this, topicsRegexStr);
           }
   
           task.initialize(context);
           task.start(taskConfig);
           log.info("{} Sink task finished initialization and start", this);
       }
   
       // Remove the call to initializeAndStart() and statusListener.onStartup() here; they'll be called automatically by the superclass
       @Override
       public void execute() {
           // Make sure any uncommitted data has been committed and the task has
           // a chance to clean up its state
           try (UncheckedCloseable suppressible = this::closePartitions) {
               while (!isStopping())
                   iteration();
           }
       }
   }
   ```
   
   And `WorkerSourceTask` might look like this:
   
   ```java
   class WorkerSourceTask extends WorkerTask {
   
       // Technically a new method, but all code has just been cut+pasted from the existing execute() method
       @Override
       protected void initializeAndStart() {
           task.initialize(new WorkerSourceTaskContext(offsetReader, this, configState));
           task.start(taskConfig);
           log.info("{} Source task finished initialization and start", this);
       }
    
       // Same as the existing execute() method, except for the code removed for initializeAndStart() and the call to statusListener.onStartup
       @Override
       public void execute() {
           synchronized (this) {
               if (startedShutdownBeforeStartCompleted) {
                   tryStop();
                   return;
               }
               finishedStart = true;
           }
   
           try {
               while (!isStopping()) {
                   if (shouldPause()) {
                       onPause();
                       if (awaitUnpause()) {
                           onResume();
                       }
                       continue;
                   }
   
                   maybeThrowProducerSendException();
   
                   if (toSend == null) {
                       log.trace("{} Nothing to send to Kafka. Polling source for additional records", this);
                       long start = time.milliseconds();
                       toSend = poll();
                       if (toSend != null) {
                           recordPollReturned(toSend.size(), time.milliseconds() - start);
                       }
                   }
                   if (toSend == null)
                       continue;
                   log.trace("{} About to send {} records to Kafka", this, toSend.size());
                   if (!sendRecords())
                       stopRequestedLatch.await(SEND_FAILED_BACKOFF_MS, TimeUnit.MILLISECONDS);
               }
           } catch (InterruptedException e) {
               // Ignore and allow to exit.
           } finally {
               // It should still be safe to commit offsets since any exception would have
               // simply resulted in not getting more records but all the existing records should be ok to flush
               // and commit offsets. Worst case, task.flush() will also throw an exception causing the offset commit
               // to fail.
               commitOffsets();
           }
       }
   }
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -562,7 +556,6 @@ public boolean startTask(
                 // statusListener
                 Plugins.compareAndSwapLoaders(savedLoader);
                 connectorStatusMetricsGroup.recordTaskRemoved(id);
-                workerMetricsGroup.recordTaskFailure();

Review comment:
       Sounds good!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] michael-carter-instaclustr commented on pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
michael-carter-instaclustr commented on pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#issuecomment-677909122


   What do you think @kkonstantine  ? Is this something you could look at?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
C0urante commented on pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#issuecomment-647707645


   Thanks for the insight @michael-carter-instaclustr. This type of direct user feedback and perspective is invaluable! I'm convinced that your interpretation is likely the more useful of the two (assuming that we don't exclusively count failures during `start` and also include failures while trying to instantiate plugins, etc., which it seems like you're on board with).
   
   I haven't reviewed the tests for this change yet but the design seems good and as soon as the code review comments are addressed I'm happy to do another round and, if everything still looks good, give my plus one.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] michael-carter-instaclustr commented on pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
michael-carter-instaclustr commented on pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#issuecomment-649884360


   Is this something you'd be willing to look at @gwenshap ?
   (Apologies if this isn't of interest to you, I'm not very familiar with which committer would be best placed to have a look)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] michael-carter-instaclustr commented on a change in pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
michael-carter-instaclustr commented on a change in pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#discussion_r443387368



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
##########
@@ -56,7 +56,7 @@
     private static final String THREAD_NAME_PREFIX = "task-thread-";
 
     protected final ConnectorTaskId id;
-    private final TaskStatus.Listener statusListener;
+    protected final TaskStatus.Listener statusListener;

Review comment:
       Yes, I had the same thought and initially started going down that road. The source of the problem as it appeared to me was that the failure methods inside the WorkerConnector/WorkerTasks were catching any failure and not propagating any exception back up to Worker where it would be able to record metrics. Simply allowing the exception to filter back up seemed the way to go, but since I needed to differentiate between a failure in startup and a failure during regular execution, separating those methods seemed a good way to do it.  This worked very well for the connector at the time, but got a bit more difficult for the tasks because they were being sent to an executor service, so there wan’t an obvious exception handler in the Worker class to handle problems. (I’ve noticed that since I looked at this, you’ve committed a change that makes the connectors use an executor service too, so this probably now applies to connectors as well as tasks). I wasn’t entirely certain whether it was important or not that the startup code run on the same thread as the regular execution, but assumed that it was, so started putting in a chain of CompletableFutures where I could check for exceptions in the other thread and only go on to submit the execute stage if the initialiseAndStart stage completed successfully. But this required there to be two different entry points for execution into the WorkerConnector/WorkerTasks which kind of defeated the point of them implementing the Runnable interface, and the exception checking was a bit ugly anyway. It was at this point that I discovered the statusListener and thought that might be a cleaner way to go. Not the only way of course, but it seemed to me to be a smaller change. Separating those methods would be more easily achieved if the startup phase could run on the same thread as the Worker, but that seems to me like more of a significant change?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] michael-carter-instaclustr commented on a change in pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
michael-carter-instaclustr commented on a change in pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#discussion_r444718803



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
##########
@@ -336,14 +336,11 @@ public void testStartConnectorFailure() throws Exception {
             assertEquals(exception, e.getCause());
         }
 
-        assertStartupStatistics(worker, 1, 1, 0, 0);

Review comment:
       Yes, I understand where you're coming from. I suppose the thing is that it's no longer the Worker's responsibility to record these metrics, so checking for them in the Worker Unit Test doesn't seem the right spot. The WorkerMetricsGroupTest tests the operation of the recording through the new tests. So I think what we're missing is something that checks that the Worker calls the wrapped status listener when we expect it to. I'll have a think about it  some more.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] michael-carter-instaclustr commented on pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
michael-carter-instaclustr commented on pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#issuecomment-871152067


   Did you have any further comments on this @rhauch ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] michael-carter-instaclustr commented on a change in pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
michael-carter-instaclustr commented on a change in pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#discussion_r444716522



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerMetricsGroupTest.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.easymock.EasyMock.eq;
+import static org.powermock.api.easymock.PowerMock.createStrictMock;
+import static org.powermock.api.easymock.PowerMock.expectLastCall;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Sensor.class})
+public class WorkerMetricsGroupTest {
+    private final String connector = "org.FakeConnector";
+    private final ConnectorTaskId task = new ConnectorTaskId(connector, 0);
+    private final RuntimeException exception = new RuntimeException();
+
+    @Test
+    public void testConnectorStartupRecordedMetrics() {
+        final WorkerMetricsGroup mockWorkerMetricsGroup = createStrictMock(WorkerMetricsGroup.class);
+        final ConnectorStatus.Listener delegate = createStrictMock(ConnectorStatus.Listener.class);
+        final WorkerMetricsGroup.ConnectorStatusListener connectorListener = mockWorkerMetricsGroup.new ConnectorStatusListener(delegate);
+
+        delegate.onStartup(connector);

Review comment:
       Good point. Will fix.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#discussion_r444445069



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerMetricsGroupTest.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.easymock.EasyMock.eq;
+import static org.powermock.api.easymock.PowerMock.createStrictMock;
+import static org.powermock.api.easymock.PowerMock.expectLastCall;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Sensor.class})
+public class WorkerMetricsGroupTest {
+    private final String connector = "org.FakeConnector";
+    private final ConnectorTaskId task = new ConnectorTaskId(connector, 0);
+    private final RuntimeException exception = new RuntimeException();
+
+    @Test
+    public void testConnectorStartupRecordedMetrics() {
+        final WorkerMetricsGroup mockWorkerMetricsGroup = createStrictMock(WorkerMetricsGroup.class);

Review comment:
       It's a little strange that we're mocking the class that we're testing here. Could we test on a real `WorkerMetricsGroup` object and mock its dependencies (specifically, the `ConnectMetrics` object that it takes in its constructor) instead? Might be a bit more work but would give us stronger guarantees about the accuracy and coverage of these tests.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerMetricsGroup.java
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+import java.util.Map;
+
+class WorkerMetricsGroup {
+    private final ConnectMetrics.MetricGroup metricGroup;
+    private final Sensor connectorStartupAttempts;
+    private final Sensor connectorStartupSuccesses;
+    private final Sensor connectorStartupFailures;
+    private final Sensor connectorStartupResults;
+    private final Sensor taskStartupAttempts;
+    private final Sensor taskStartupSuccesses;
+    private final Sensor taskStartupFailures;
+    private final Sensor taskStartupResults;
+
+    public WorkerMetricsGroup(final Map<String, WorkerConnector> connectors, Map<ConnectorTaskId, WorkerTask> tasks, ConnectMetrics connectMetrics) {
+        ConnectMetricsRegistry registry = connectMetrics.registry();
+        metricGroup = connectMetrics.group(registry.workerGroupName());
+
+        metricGroup.addValueMetric(registry.connectorCount, now -> (double) connectors.size());
+        metricGroup.addValueMetric(registry.taskCount, now -> (double) tasks.size());
+
+        MetricName connectorFailurePct = metricGroup.metricName(registry.connectorStartupFailurePercentage);
+        MetricName connectorSuccessPct = metricGroup.metricName(registry.connectorStartupSuccessPercentage);
+        Frequencies connectorStartupResultFrequencies = Frequencies.forBooleanValues(connectorFailurePct, connectorSuccessPct);
+        connectorStartupResults = metricGroup.sensor("connector-startup-results");
+        connectorStartupResults.add(connectorStartupResultFrequencies);
+
+        connectorStartupAttempts = metricGroup.sensor("connector-startup-attempts");
+        connectorStartupAttempts.add(metricGroup.metricName(registry.connectorStartupAttemptsTotal), new CumulativeSum());
+
+        connectorStartupSuccesses = metricGroup.sensor("connector-startup-successes");
+        connectorStartupSuccesses.add(metricGroup.metricName(registry.connectorStartupSuccessTotal), new CumulativeSum());
+
+        connectorStartupFailures = metricGroup.sensor("connector-startup-failures");
+        connectorStartupFailures.add(metricGroup.metricName(registry.connectorStartupFailureTotal), new CumulativeSum());
+
+        MetricName taskFailurePct = metricGroup.metricName(registry.taskStartupFailurePercentage);
+        MetricName taskSuccessPct = metricGroup.metricName(registry.taskStartupSuccessPercentage);
+        Frequencies taskStartupResultFrequencies = Frequencies.forBooleanValues(taskFailurePct, taskSuccessPct);
+        taskStartupResults = metricGroup.sensor("task-startup-results");
+        taskStartupResults.add(taskStartupResultFrequencies);
+
+        taskStartupAttempts = metricGroup.sensor("task-startup-attempts");
+        taskStartupAttempts.add(metricGroup.metricName(registry.taskStartupAttemptsTotal), new CumulativeSum());
+
+        taskStartupSuccesses = metricGroup.sensor("task-startup-successes");
+        taskStartupSuccesses.add(metricGroup.metricName(registry.taskStartupSuccessTotal), new CumulativeSum());
+
+        taskStartupFailures = metricGroup.sensor("task-startup-failures");
+        taskStartupFailures.add(metricGroup.metricName(registry.taskStartupFailureTotal), new CumulativeSum());
+    }
+
+    void close() {
+        metricGroup.close();
+    }
+
+    void recordConnectorStartupFailure() {
+        connectorStartupAttempts.record(1.0);
+        connectorStartupFailures.record(1.0);
+        connectorStartupResults.record(0.0);
+    }
+
+    void recordConnectorStartupSuccess() {
+        connectorStartupAttempts.record(1.0);
+        connectorStartupSuccesses.record(1.0);
+        connectorStartupResults.record(1.0);
+    }
+
+    void recordTaskFailure() {
+        taskStartupAttempts.record(1.0);
+        taskStartupFailures.record(1.0);
+        taskStartupResults.record(0.0);
+    }
+
+    void recordTaskSuccess() {
+        taskStartupAttempts.record(1.0);
+        taskStartupSuccesses.record(1.0);
+        taskStartupResults.record(1.0);
+    }
+
+    protected ConnectMetrics.MetricGroup metricGroup() {
+        return metricGroup;
+    }
+
+    ConnectorStatus.Listener wrapStatusListener(ConnectorStatus.Listener delegateListener) {
+        return new ConnectorStatusListener(delegateListener);
+    }
+
+    TaskStatus.Listener wrapStatusListener(TaskStatus.Listener delegateListener) {
+        return new TaskStatusListener(delegateListener);
+    }
+
+    class ConnectorStatusListener implements ConnectorStatus.Listener {
+        private final ConnectorStatus.Listener delegateListener;
+        private boolean startupSucceeded = false;
+
+        ConnectorStatusListener(ConnectorStatus.Listener delegateListener) {
+            this.delegateListener = delegateListener;
+        }
+
+        @Override
+        public void onShutdown(final String connector) {
+            delegateListener.onShutdown(connector);
+        }
+
+        @Override
+        public void onFailure(final String connector, final Throwable cause) {
+            if (!startupSucceeded) {
+                recordConnectorStartupFailure();
+            }
+            delegateListener.onFailure(connector, cause);
+        }
+
+        @Override
+        public void onPause(final String connector) {
+            delegateListener.onPause(connector);
+        }
+
+        @Override
+        public void onResume(final String connector) {
+            delegateListener.onResume(connector);
+        }
+
+        @Override
+        public void onStartup(final String connector) {
+            delegateListener.onStartup(connector);
+            startupSucceeded = true;
+            recordConnectorStartupSuccess();
+        }
+
+        @Override
+        public void onDeletion(final String connector) {
+            delegateListener.onDeletion(connector);
+        }
+    }
+
+    class TaskStatusListener implements TaskStatus.Listener {
+        private final TaskStatus.Listener delegatedListener;
+        private boolean startupSucceeded = false;

Review comment:
       This is going to be modified and accessed on potentially different threads, right? If so, we should add the `volatile` modifier here.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerMetricsGroup.java
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+import java.util.Map;
+
+class WorkerMetricsGroup {
+    private final ConnectMetrics.MetricGroup metricGroup;
+    private final Sensor connectorStartupAttempts;
+    private final Sensor connectorStartupSuccesses;
+    private final Sensor connectorStartupFailures;
+    private final Sensor connectorStartupResults;
+    private final Sensor taskStartupAttempts;
+    private final Sensor taskStartupSuccesses;
+    private final Sensor taskStartupFailures;
+    private final Sensor taskStartupResults;
+
+    public WorkerMetricsGroup(final Map<String, WorkerConnector> connectors, Map<ConnectorTaskId, WorkerTask> tasks, ConnectMetrics connectMetrics) {
+        ConnectMetricsRegistry registry = connectMetrics.registry();
+        metricGroup = connectMetrics.group(registry.workerGroupName());
+
+        metricGroup.addValueMetric(registry.connectorCount, now -> (double) connectors.size());
+        metricGroup.addValueMetric(registry.taskCount, now -> (double) tasks.size());
+
+        MetricName connectorFailurePct = metricGroup.metricName(registry.connectorStartupFailurePercentage);
+        MetricName connectorSuccessPct = metricGroup.metricName(registry.connectorStartupSuccessPercentage);
+        Frequencies connectorStartupResultFrequencies = Frequencies.forBooleanValues(connectorFailurePct, connectorSuccessPct);
+        connectorStartupResults = metricGroup.sensor("connector-startup-results");
+        connectorStartupResults.add(connectorStartupResultFrequencies);
+
+        connectorStartupAttempts = metricGroup.sensor("connector-startup-attempts");
+        connectorStartupAttempts.add(metricGroup.metricName(registry.connectorStartupAttemptsTotal), new CumulativeSum());
+
+        connectorStartupSuccesses = metricGroup.sensor("connector-startup-successes");
+        connectorStartupSuccesses.add(metricGroup.metricName(registry.connectorStartupSuccessTotal), new CumulativeSum());
+
+        connectorStartupFailures = metricGroup.sensor("connector-startup-failures");
+        connectorStartupFailures.add(metricGroup.metricName(registry.connectorStartupFailureTotal), new CumulativeSum());
+
+        MetricName taskFailurePct = metricGroup.metricName(registry.taskStartupFailurePercentage);
+        MetricName taskSuccessPct = metricGroup.metricName(registry.taskStartupSuccessPercentage);
+        Frequencies taskStartupResultFrequencies = Frequencies.forBooleanValues(taskFailurePct, taskSuccessPct);
+        taskStartupResults = metricGroup.sensor("task-startup-results");
+        taskStartupResults.add(taskStartupResultFrequencies);
+
+        taskStartupAttempts = metricGroup.sensor("task-startup-attempts");
+        taskStartupAttempts.add(metricGroup.metricName(registry.taskStartupAttemptsTotal), new CumulativeSum());
+
+        taskStartupSuccesses = metricGroup.sensor("task-startup-successes");
+        taskStartupSuccesses.add(metricGroup.metricName(registry.taskStartupSuccessTotal), new CumulativeSum());
+
+        taskStartupFailures = metricGroup.sensor("task-startup-failures");
+        taskStartupFailures.add(metricGroup.metricName(registry.taskStartupFailureTotal), new CumulativeSum());
+    }
+
+    void close() {
+        metricGroup.close();
+    }
+
+    void recordConnectorStartupFailure() {
+        connectorStartupAttempts.record(1.0);
+        connectorStartupFailures.record(1.0);
+        connectorStartupResults.record(0.0);
+    }
+
+    void recordConnectorStartupSuccess() {
+        connectorStartupAttempts.record(1.0);
+        connectorStartupSuccesses.record(1.0);
+        connectorStartupResults.record(1.0);
+    }
+
+    void recordTaskFailure() {
+        taskStartupAttempts.record(1.0);
+        taskStartupFailures.record(1.0);
+        taskStartupResults.record(0.0);
+    }
+
+    void recordTaskSuccess() {
+        taskStartupAttempts.record(1.0);
+        taskStartupSuccesses.record(1.0);
+        taskStartupResults.record(1.0);
+    }
+
+    protected ConnectMetrics.MetricGroup metricGroup() {
+        return metricGroup;
+    }
+
+    ConnectorStatus.Listener wrapStatusListener(ConnectorStatus.Listener delegateListener) {
+        return new ConnectorStatusListener(delegateListener);
+    }
+
+    TaskStatus.Listener wrapStatusListener(TaskStatus.Listener delegateListener) {
+        return new TaskStatusListener(delegateListener);
+    }
+
+    class ConnectorStatusListener implements ConnectorStatus.Listener {
+        private final ConnectorStatus.Listener delegateListener;
+        private boolean startupSucceeded = false;

Review comment:
       This is going to be modified and accessed on potentially different threads, right? If so, we should add the `volatile` modifier here.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerMetricsGroupTest.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.easymock.EasyMock.eq;
+import static org.powermock.api.easymock.PowerMock.createStrictMock;
+import static org.powermock.api.easymock.PowerMock.expectLastCall;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Sensor.class})
+public class WorkerMetricsGroupTest {
+    private final String connector = "org.FakeConnector";
+    private final ConnectorTaskId task = new ConnectorTaskId(connector, 0);
+    private final RuntimeException exception = new RuntimeException();
+
+    @Test
+    public void testConnectorStartupRecordedMetrics() {
+        final WorkerMetricsGroup mockWorkerMetricsGroup = createStrictMock(WorkerMetricsGroup.class);
+        final ConnectorStatus.Listener delegate = createStrictMock(ConnectorStatus.Listener.class);

Review comment:
       Nit: can probably just use the `@Mock` annotation and make these instance instead of local variables so that we don't have to repeat this code at the beginning of each test.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerMetricsGroupTest.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.easymock.EasyMock.eq;
+import static org.powermock.api.easymock.PowerMock.createStrictMock;
+import static org.powermock.api.easymock.PowerMock.expectLastCall;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Sensor.class})
+public class WorkerMetricsGroupTest {
+    private final String connector = "org.FakeConnector";
+    private final ConnectorTaskId task = new ConnectorTaskId(connector, 0);
+    private final RuntimeException exception = new RuntimeException();
+
+    @Test
+    public void testConnectorStartupRecordedMetrics() {
+        final WorkerMetricsGroup mockWorkerMetricsGroup = createStrictMock(WorkerMetricsGroup.class);
+        final ConnectorStatus.Listener delegate = createStrictMock(ConnectorStatus.Listener.class);
+        final WorkerMetricsGroup.ConnectorStatusListener connectorListener = mockWorkerMetricsGroup.new ConnectorStatusListener(delegate);
+
+        delegate.onStartup(connector);

Review comment:
       Nit: looks like similar calls use `eq(connector)` instead of `connector`. I think they both work but we should stick to one or the other.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerMetricsGroupTest.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.easymock.EasyMock.eq;
+import static org.powermock.api.easymock.PowerMock.createStrictMock;
+import static org.powermock.api.easymock.PowerMock.expectLastCall;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Sensor.class})
+public class WorkerMetricsGroupTest {
+    private final String connector = "org.FakeConnector";
+    private final ConnectorTaskId task = new ConnectorTaskId(connector, 0);
+    private final RuntimeException exception = new RuntimeException();
+
+    @Test
+    public void testConnectorStartupRecordedMetrics() {
+        final WorkerMetricsGroup mockWorkerMetricsGroup = createStrictMock(WorkerMetricsGroup.class);
+        final ConnectorStatus.Listener delegate = createStrictMock(ConnectorStatus.Listener.class);

Review comment:
       Ahh, I see--we construct a connector status listener for some and a task status listener for others. Honestly, I think it's probably fine if we just make both available as instance variables and `@Mock` them.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerMetricsGroupTest.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.easymock.EasyMock.eq;
+import static org.powermock.api.easymock.PowerMock.createStrictMock;
+import static org.powermock.api.easymock.PowerMock.expectLastCall;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Sensor.class})
+public class WorkerMetricsGroupTest {
+    private final String connector = "org.FakeConnector";
+    private final ConnectorTaskId task = new ConnectorTaskId(connector, 0);
+    private final RuntimeException exception = new RuntimeException();
+
+    @Test
+    public void testConnectorStartupRecordedMetrics() {
+        final WorkerMetricsGroup mockWorkerMetricsGroup = createStrictMock(WorkerMetricsGroup.class);
+        final ConnectorStatus.Listener delegate = createStrictMock(ConnectorStatus.Listener.class);
+        final WorkerMetricsGroup.ConnectorStatusListener connectorListener = mockWorkerMetricsGroup.new ConnectorStatusListener(delegate);
+
+        delegate.onStartup(connector);
+        expectLastCall();
+
+        mockWorkerMetricsGroup.recordConnectorStartupSuccess();
+        expectLastCall();
+
+        PowerMock.replayAll();
+
+        connectorListener.onStartup(connector);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testConnectorFailureAfterStartupRecordedMetrics() {
+        final WorkerMetricsGroup mockWorkerMetricsGroup = createStrictMock(WorkerMetricsGroup.class);
+        final ConnectorStatus.Listener delegate = createStrictMock(ConnectorStatus.Listener.class);
+        final WorkerMetricsGroup.ConnectorStatusListener connectorListener = mockWorkerMetricsGroup.new ConnectorStatusListener(delegate);
+
+        delegate.onStartup(eq(connector));
+        expectLastCall();
+
+        delegate.onFailure(eq(connector), eq(exception));
+        expectLastCall();
+
+        mockWorkerMetricsGroup.recordConnectorStartupSuccess();

Review comment:
       Nit: I think it might make more sense if the expectations are set in chronological order instead of grouping by which mocked instance is having expectations set. So in this case, this line would be moved after the expectation for `delegate::onStartup` and before the one for `delegate::onFailure`. But not a big deal, if you think this is more readable feel free to leave as-is.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
##########
@@ -336,14 +336,11 @@ public void testStartConnectorFailure() throws Exception {
             assertEquals(exception, e.getCause());
         }
 
-        assertStartupStatistics(worker, 1, 1, 0, 0);

Review comment:
       It's unfortunate that we're losing test coverage here, especially since it makes issues like the one that necessitates this PR more likely as we can't prevent regressions. Is there a way we can keep some of this testing logic, either through modifying the `WorkerTest` or by relocating it to the `WorkerMetricsGroupTest`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] michael-carter-instaclustr commented on a change in pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
michael-carter-instaclustr commented on a change in pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#discussion_r444715149



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerMetricsGroup.java
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+import java.util.Map;
+
+class WorkerMetricsGroup {
+    private final ConnectMetrics.MetricGroup metricGroup;
+    private final Sensor connectorStartupAttempts;
+    private final Sensor connectorStartupSuccesses;
+    private final Sensor connectorStartupFailures;
+    private final Sensor connectorStartupResults;
+    private final Sensor taskStartupAttempts;
+    private final Sensor taskStartupSuccesses;
+    private final Sensor taskStartupFailures;
+    private final Sensor taskStartupResults;
+
+    public WorkerMetricsGroup(final Map<String, WorkerConnector> connectors, Map<ConnectorTaskId, WorkerTask> tasks, ConnectMetrics connectMetrics) {
+        ConnectMetricsRegistry registry = connectMetrics.registry();
+        metricGroup = connectMetrics.group(registry.workerGroupName());
+
+        metricGroup.addValueMetric(registry.connectorCount, now -> (double) connectors.size());
+        metricGroup.addValueMetric(registry.taskCount, now -> (double) tasks.size());
+
+        MetricName connectorFailurePct = metricGroup.metricName(registry.connectorStartupFailurePercentage);
+        MetricName connectorSuccessPct = metricGroup.metricName(registry.connectorStartupSuccessPercentage);
+        Frequencies connectorStartupResultFrequencies = Frequencies.forBooleanValues(connectorFailurePct, connectorSuccessPct);
+        connectorStartupResults = metricGroup.sensor("connector-startup-results");
+        connectorStartupResults.add(connectorStartupResultFrequencies);
+
+        connectorStartupAttempts = metricGroup.sensor("connector-startup-attempts");
+        connectorStartupAttempts.add(metricGroup.metricName(registry.connectorStartupAttemptsTotal), new CumulativeSum());
+
+        connectorStartupSuccesses = metricGroup.sensor("connector-startup-successes");
+        connectorStartupSuccesses.add(metricGroup.metricName(registry.connectorStartupSuccessTotal), new CumulativeSum());
+
+        connectorStartupFailures = metricGroup.sensor("connector-startup-failures");
+        connectorStartupFailures.add(metricGroup.metricName(registry.connectorStartupFailureTotal), new CumulativeSum());
+
+        MetricName taskFailurePct = metricGroup.metricName(registry.taskStartupFailurePercentage);
+        MetricName taskSuccessPct = metricGroup.metricName(registry.taskStartupSuccessPercentage);
+        Frequencies taskStartupResultFrequencies = Frequencies.forBooleanValues(taskFailurePct, taskSuccessPct);
+        taskStartupResults = metricGroup.sensor("task-startup-results");
+        taskStartupResults.add(taskStartupResultFrequencies);
+
+        taskStartupAttempts = metricGroup.sensor("task-startup-attempts");
+        taskStartupAttempts.add(metricGroup.metricName(registry.taskStartupAttemptsTotal), new CumulativeSum());
+
+        taskStartupSuccesses = metricGroup.sensor("task-startup-successes");
+        taskStartupSuccesses.add(metricGroup.metricName(registry.taskStartupSuccessTotal), new CumulativeSum());
+
+        taskStartupFailures = metricGroup.sensor("task-startup-failures");
+        taskStartupFailures.add(metricGroup.metricName(registry.taskStartupFailureTotal), new CumulativeSum());
+    }
+
+    void close() {
+        metricGroup.close();
+    }
+
+    void recordConnectorStartupFailure() {
+        connectorStartupAttempts.record(1.0);
+        connectorStartupFailures.record(1.0);
+        connectorStartupResults.record(0.0);
+    }
+
+    void recordConnectorStartupSuccess() {
+        connectorStartupAttempts.record(1.0);
+        connectorStartupSuccesses.record(1.0);
+        connectorStartupResults.record(1.0);
+    }
+
+    void recordTaskFailure() {
+        taskStartupAttempts.record(1.0);
+        taskStartupFailures.record(1.0);
+        taskStartupResults.record(0.0);
+    }
+
+    void recordTaskSuccess() {
+        taskStartupAttempts.record(1.0);
+        taskStartupSuccesses.record(1.0);
+        taskStartupResults.record(1.0);
+    }
+
+    protected ConnectMetrics.MetricGroup metricGroup() {
+        return metricGroup;
+    }
+
+    ConnectorStatus.Listener wrapStatusListener(ConnectorStatus.Listener delegateListener) {
+        return new ConnectorStatusListener(delegateListener);
+    }
+
+    TaskStatus.Listener wrapStatusListener(TaskStatus.Listener delegateListener) {
+        return new TaskStatusListener(delegateListener);
+    }
+
+    class ConnectorStatusListener implements ConnectorStatus.Listener {
+        private final ConnectorStatus.Listener delegateListener;
+        private boolean startupSucceeded = false;
+
+        ConnectorStatusListener(ConnectorStatus.Listener delegateListener) {
+            this.delegateListener = delegateListener;
+        }
+
+        @Override
+        public void onShutdown(final String connector) {
+            delegateListener.onShutdown(connector);
+        }
+
+        @Override
+        public void onFailure(final String connector, final Throwable cause) {
+            if (!startupSucceeded) {
+                recordConnectorStartupFailure();
+            }
+            delegateListener.onFailure(connector, cause);
+        }
+
+        @Override
+        public void onPause(final String connector) {
+            delegateListener.onPause(connector);
+        }
+
+        @Override
+        public void onResume(final String connector) {
+            delegateListener.onResume(connector);
+        }
+
+        @Override
+        public void onStartup(final String connector) {
+            delegateListener.onStartup(connector);
+            startupSucceeded = true;
+            recordConnectorStartupSuccess();
+        }
+
+        @Override
+        public void onDeletion(final String connector) {
+            delegateListener.onDeletion(connector);
+        }
+    }
+
+    class TaskStatusListener implements TaskStatus.Listener {
+        private final TaskStatus.Listener delegatedListener;
+        private boolean startupSucceeded = false;

Review comment:
       Yep, I think there is a possibility of calling this from different threads. I'll add that in.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerMetricsGroup.java
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+import java.util.Map;
+
+class WorkerMetricsGroup {
+    private final ConnectMetrics.MetricGroup metricGroup;
+    private final Sensor connectorStartupAttempts;
+    private final Sensor connectorStartupSuccesses;
+    private final Sensor connectorStartupFailures;
+    private final Sensor connectorStartupResults;
+    private final Sensor taskStartupAttempts;
+    private final Sensor taskStartupSuccesses;
+    private final Sensor taskStartupFailures;
+    private final Sensor taskStartupResults;
+
+    public WorkerMetricsGroup(final Map<String, WorkerConnector> connectors, Map<ConnectorTaskId, WorkerTask> tasks, ConnectMetrics connectMetrics) {
+        ConnectMetricsRegistry registry = connectMetrics.registry();
+        metricGroup = connectMetrics.group(registry.workerGroupName());
+
+        metricGroup.addValueMetric(registry.connectorCount, now -> (double) connectors.size());
+        metricGroup.addValueMetric(registry.taskCount, now -> (double) tasks.size());
+
+        MetricName connectorFailurePct = metricGroup.metricName(registry.connectorStartupFailurePercentage);
+        MetricName connectorSuccessPct = metricGroup.metricName(registry.connectorStartupSuccessPercentage);
+        Frequencies connectorStartupResultFrequencies = Frequencies.forBooleanValues(connectorFailurePct, connectorSuccessPct);
+        connectorStartupResults = metricGroup.sensor("connector-startup-results");
+        connectorStartupResults.add(connectorStartupResultFrequencies);
+
+        connectorStartupAttempts = metricGroup.sensor("connector-startup-attempts");
+        connectorStartupAttempts.add(metricGroup.metricName(registry.connectorStartupAttemptsTotal), new CumulativeSum());
+
+        connectorStartupSuccesses = metricGroup.sensor("connector-startup-successes");
+        connectorStartupSuccesses.add(metricGroup.metricName(registry.connectorStartupSuccessTotal), new CumulativeSum());
+
+        connectorStartupFailures = metricGroup.sensor("connector-startup-failures");
+        connectorStartupFailures.add(metricGroup.metricName(registry.connectorStartupFailureTotal), new CumulativeSum());
+
+        MetricName taskFailurePct = metricGroup.metricName(registry.taskStartupFailurePercentage);
+        MetricName taskSuccessPct = metricGroup.metricName(registry.taskStartupSuccessPercentage);
+        Frequencies taskStartupResultFrequencies = Frequencies.forBooleanValues(taskFailurePct, taskSuccessPct);
+        taskStartupResults = metricGroup.sensor("task-startup-results");
+        taskStartupResults.add(taskStartupResultFrequencies);
+
+        taskStartupAttempts = metricGroup.sensor("task-startup-attempts");
+        taskStartupAttempts.add(metricGroup.metricName(registry.taskStartupAttemptsTotal), new CumulativeSum());
+
+        taskStartupSuccesses = metricGroup.sensor("task-startup-successes");
+        taskStartupSuccesses.add(metricGroup.metricName(registry.taskStartupSuccessTotal), new CumulativeSum());
+
+        taskStartupFailures = metricGroup.sensor("task-startup-failures");
+        taskStartupFailures.add(metricGroup.metricName(registry.taskStartupFailureTotal), new CumulativeSum());
+    }
+
+    void close() {
+        metricGroup.close();
+    }
+
+    void recordConnectorStartupFailure() {
+        connectorStartupAttempts.record(1.0);
+        connectorStartupFailures.record(1.0);
+        connectorStartupResults.record(0.0);
+    }
+
+    void recordConnectorStartupSuccess() {
+        connectorStartupAttempts.record(1.0);
+        connectorStartupSuccesses.record(1.0);
+        connectorStartupResults.record(1.0);
+    }
+
+    void recordTaskFailure() {
+        taskStartupAttempts.record(1.0);
+        taskStartupFailures.record(1.0);
+        taskStartupResults.record(0.0);
+    }
+
+    void recordTaskSuccess() {
+        taskStartupAttempts.record(1.0);
+        taskStartupSuccesses.record(1.0);
+        taskStartupResults.record(1.0);
+    }
+
+    protected ConnectMetrics.MetricGroup metricGroup() {
+        return metricGroup;
+    }
+
+    ConnectorStatus.Listener wrapStatusListener(ConnectorStatus.Listener delegateListener) {
+        return new ConnectorStatusListener(delegateListener);
+    }
+
+    TaskStatus.Listener wrapStatusListener(TaskStatus.Listener delegateListener) {
+        return new TaskStatusListener(delegateListener);
+    }
+
+    class ConnectorStatusListener implements ConnectorStatus.Listener {
+        private final ConnectorStatus.Listener delegateListener;
+        private boolean startupSucceeded = false;

Review comment:
       Yep, I think there is a possibility of calling this from different threads. I'll add that in.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#issuecomment-809581891


   @C0urante said this:
   > I'm also on the fence about the reshuffling of where `recordConnectorStartupSuccess` and `recordTaskSuccess` are called. The descriptions of the connector/task startup metrics in [KIP-196](https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework) are a little thin; for example, the doc for `connector-startup-success-total` is "The total number of connector starts that succeeded.". A question that we'll want to answer before merging this is: if the framework successfully instantiates a connector and is able to call `start` on it, should that alone qualify as a "successful" startup, or does the call to `start` also have to go off without a hitch?
   
   Later, @michael-carter-instaclustr replied:
   > My fundamental assumption in approaching this was that the ‘connector-startup-failure-total’ metric, described in the KIP as ‘The total number of connector starts that failed’, was intended to be a numerical record of failures within the ‘start’ method of the connector (And likewise for the task based metrics). Or in other words, they represented the health of the worker in an integration sense. (e.g. Does the worker have the right connectivity to do its job? Are people submitting valid configurations or are the users of Connect not understanding how to use it?) This to me seems like a useful aggregate metric that relates to the use of the worker as a whole more than a record of any individual connector failure.
   
   Indeed the original intent for `connector-startup-success-total` metric was to represent the number of connectors whose `start()` method ran without throwing an exception. The `connector-startup-failure-total` metric OTOH was to represent the number of connectors whose `start()` method did throw an exception. 
   
   Likewise, for tasks and the corresponding task metrics.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] michael-carter-instaclustr commented on pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
michael-carter-instaclustr commented on pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#issuecomment-655189921


   Perhaps this is something you would be interested in @rhauch  ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] michael-carter-instaclustr commented on pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
michael-carter-instaclustr commented on pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#issuecomment-660778556


   Perhaps it's something you could look at @mjsax ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#discussion_r603405341



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -225,12 +225,16 @@ private synchronized void tryStop() {
         }
     }
 
+    @Override
+    protected void initializeAndStart() {
+        task.initialize(new WorkerSourceTaskContext(offsetReader, this, configState));
+        task.start(taskConfig);
+        log.info("{} Source task finished initialization and start", this);
+    }
+
     @Override
     public void execute() {
         try {
-            task.initialize(new WorkerSourceTaskContext(offsetReader, this, configState));
-            task.start(taskConfig);
-            log.info("{} Source task finished initialization and start", this);

Review comment:
       We're removing the INFO-level log message here, which we use to help identify that the worker task entered this method. It might be good to keep an INFO-level log message here, but use something like `{} Executing source task". WDYT?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerMetricsGroup.java
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+import java.util.Map;
+
+class WorkerMetricsGroup {
+    private final ConnectMetrics.MetricGroup metricGroup;
+    private final Sensor connectorStartupAttempts;
+    private final Sensor connectorStartupSuccesses;
+    private final Sensor connectorStartupFailures;
+    private final Sensor connectorStartupResults;
+    private final Sensor taskStartupAttempts;
+    private final Sensor taskStartupSuccesses;
+    private final Sensor taskStartupFailures;
+    private final Sensor taskStartupResults;
+
+    public WorkerMetricsGroup(final Map<String, WorkerConnector> connectors, Map<ConnectorTaskId, WorkerTask> tasks, ConnectMetrics connectMetrics) {
+        ConnectMetricsRegistry registry = connectMetrics.registry();
+        metricGroup = connectMetrics.group(registry.workerGroupName());
+
+        metricGroup.addValueMetric(registry.connectorCount, now -> (double) connectors.size());
+        metricGroup.addValueMetric(registry.taskCount, now -> (double) tasks.size());
+
+        MetricName connectorFailurePct = metricGroup.metricName(registry.connectorStartupFailurePercentage);
+        MetricName connectorSuccessPct = metricGroup.metricName(registry.connectorStartupSuccessPercentage);
+        Frequencies connectorStartupResultFrequencies = Frequencies.forBooleanValues(connectorFailurePct, connectorSuccessPct);
+        connectorStartupResults = metricGroup.sensor("connector-startup-results");
+        connectorStartupResults.add(connectorStartupResultFrequencies);
+
+        connectorStartupAttempts = metricGroup.sensor("connector-startup-attempts");
+        connectorStartupAttempts.add(metricGroup.metricName(registry.connectorStartupAttemptsTotal), new CumulativeSum());
+
+        connectorStartupSuccesses = metricGroup.sensor("connector-startup-successes");
+        connectorStartupSuccesses.add(metricGroup.metricName(registry.connectorStartupSuccessTotal), new CumulativeSum());
+
+        connectorStartupFailures = metricGroup.sensor("connector-startup-failures");
+        connectorStartupFailures.add(metricGroup.metricName(registry.connectorStartupFailureTotal), new CumulativeSum());
+
+        MetricName taskFailurePct = metricGroup.metricName(registry.taskStartupFailurePercentage);
+        MetricName taskSuccessPct = metricGroup.metricName(registry.taskStartupSuccessPercentage);
+        Frequencies taskStartupResultFrequencies = Frequencies.forBooleanValues(taskFailurePct, taskSuccessPct);
+        taskStartupResults = metricGroup.sensor("task-startup-results");
+        taskStartupResults.add(taskStartupResultFrequencies);
+
+        taskStartupAttempts = metricGroup.sensor("task-startup-attempts");
+        taskStartupAttempts.add(metricGroup.metricName(registry.taskStartupAttemptsTotal), new CumulativeSum());
+
+        taskStartupSuccesses = metricGroup.sensor("task-startup-successes");
+        taskStartupSuccesses.add(metricGroup.metricName(registry.taskStartupSuccessTotal), new CumulativeSum());
+
+        taskStartupFailures = metricGroup.sensor("task-startup-failures");
+        taskStartupFailures.add(metricGroup.metricName(registry.taskStartupFailureTotal), new CumulativeSum());
+    }
+
+    void close() {
+        metricGroup.close();
+    }
+
+    void recordConnectorStartupFailure() {
+        connectorStartupAttempts.record(1.0);
+        connectorStartupFailures.record(1.0);
+        connectorStartupResults.record(0.0);
+    }
+
+    void recordConnectorStartupSuccess() {
+        connectorStartupAttempts.record(1.0);
+        connectorStartupSuccesses.record(1.0);
+        connectorStartupResults.record(1.0);
+    }
+
+    void recordTaskFailure() {
+        taskStartupAttempts.record(1.0);
+        taskStartupFailures.record(1.0);
+        taskStartupResults.record(0.0);
+    }
+
+    void recordTaskSuccess() {
+        taskStartupAttempts.record(1.0);
+        taskStartupSuccesses.record(1.0);
+        taskStartupResults.record(1.0);
+    }
+
+    protected ConnectMetrics.MetricGroup metricGroup() {
+        return metricGroup;
+    }
+
+    ConnectorStatus.Listener wrapStatusListener(ConnectorStatus.Listener delegateListener) {
+        return new ConnectorStatusListener(delegateListener);
+    }
+
+    TaskStatus.Listener wrapStatusListener(TaskStatus.Listener delegateListener) {
+        return new TaskStatusListener(delegateListener);
+    }
+
+    class ConnectorStatusListener implements ConnectorStatus.Listener {

Review comment:
       Nit: the methods of the `ConnectorStatusListener` and `TaskStatusListener` classes are in very different orders. It would help readability to have them in the same order. IMO, the order of the `TaskStatusListener` methods is nice because it follows the lifecycle.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
##########
@@ -336,14 +336,11 @@ public void testStartConnectorFailure() throws Exception {
             assertEquals(exception, e.getCause());
         }
 
-        assertStartupStatistics(worker, 1, 1, 0, 0);

Review comment:
       Both of you raise good points. But I tend to agree with @C0urante that it's better to keep these checks. @michael-carter-instaclustr is right that the new `WorkerMetricsGroupTest` is where we should validate that `WorkerMetricsGroup` works correctly. These assertions, however, serve to verify that the `Worker` is calling the `WorkerMetricsGroup` correctly, and they serve to help detect regressions. Besides, there still are lots of places within `WorkerTest` where the `assertStartupStatistics(...)` method is still called, so why keep only some of these rather than keep them all?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -191,9 +191,9 @@ public void transitionTo(TargetState state) {
         consumer.wakeup();
     }
 
+
     @Override
     public void execute() {
-        initializeAndStart();

Review comment:
       We never really had an INFO-level log message here (unlike `WorkerSourceTask.execute()`, though we could always tell by the INFO-level log message in `initializeAndStart()`. Since the latter now does a bit more work, it probably is better to have an INFO-level log message here at the start of `execute()`. What do you think about adding an INFO-level log message here, using something like `{} Executing sink task" (similar to `WorkerSourceTask.execute()`)?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -191,9 +191,9 @@ public void transitionTo(TargetState state) {
         consumer.wakeup();
     }
 
+

Review comment:
       Let's avoid unnecessary changes.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerMetricsGroup.java
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+import java.util.Map;
+
+class WorkerMetricsGroup {
+    private final ConnectMetrics.MetricGroup metricGroup;
+    private final Sensor connectorStartupAttempts;
+    private final Sensor connectorStartupSuccesses;
+    private final Sensor connectorStartupFailures;
+    private final Sensor connectorStartupResults;
+    private final Sensor taskStartupAttempts;
+    private final Sensor taskStartupSuccesses;
+    private final Sensor taskStartupFailures;
+    private final Sensor taskStartupResults;
+
+    public WorkerMetricsGroup(final Map<String, WorkerConnector> connectors, Map<ConnectorTaskId, WorkerTask> tasks, ConnectMetrics connectMetrics) {
+        ConnectMetricsRegistry registry = connectMetrics.registry();
+        metricGroup = connectMetrics.group(registry.workerGroupName());
+
+        metricGroup.addValueMetric(registry.connectorCount, now -> (double) connectors.size());
+        metricGroup.addValueMetric(registry.taskCount, now -> (double) tasks.size());
+
+        MetricName connectorFailurePct = metricGroup.metricName(registry.connectorStartupFailurePercentage);
+        MetricName connectorSuccessPct = metricGroup.metricName(registry.connectorStartupSuccessPercentage);
+        Frequencies connectorStartupResultFrequencies = Frequencies.forBooleanValues(connectorFailurePct, connectorSuccessPct);
+        connectorStartupResults = metricGroup.sensor("connector-startup-results");
+        connectorStartupResults.add(connectorStartupResultFrequencies);
+
+        connectorStartupAttempts = metricGroup.sensor("connector-startup-attempts");
+        connectorStartupAttempts.add(metricGroup.metricName(registry.connectorStartupAttemptsTotal), new CumulativeSum());
+
+        connectorStartupSuccesses = metricGroup.sensor("connector-startup-successes");
+        connectorStartupSuccesses.add(metricGroup.metricName(registry.connectorStartupSuccessTotal), new CumulativeSum());
+
+        connectorStartupFailures = metricGroup.sensor("connector-startup-failures");
+        connectorStartupFailures.add(metricGroup.metricName(registry.connectorStartupFailureTotal), new CumulativeSum());
+
+        MetricName taskFailurePct = metricGroup.metricName(registry.taskStartupFailurePercentage);
+        MetricName taskSuccessPct = metricGroup.metricName(registry.taskStartupSuccessPercentage);
+        Frequencies taskStartupResultFrequencies = Frequencies.forBooleanValues(taskFailurePct, taskSuccessPct);
+        taskStartupResults = metricGroup.sensor("task-startup-results");
+        taskStartupResults.add(taskStartupResultFrequencies);
+
+        taskStartupAttempts = metricGroup.sensor("task-startup-attempts");
+        taskStartupAttempts.add(metricGroup.metricName(registry.taskStartupAttemptsTotal), new CumulativeSum());
+
+        taskStartupSuccesses = metricGroup.sensor("task-startup-successes");
+        taskStartupSuccesses.add(metricGroup.metricName(registry.taskStartupSuccessTotal), new CumulativeSum());
+
+        taskStartupFailures = metricGroup.sensor("task-startup-failures");
+        taskStartupFailures.add(metricGroup.metricName(registry.taskStartupFailureTotal), new CumulativeSum());
+    }
+
+    void close() {
+        metricGroup.close();
+    }
+
+    void recordConnectorStartupFailure() {
+        connectorStartupAttempts.record(1.0);
+        connectorStartupFailures.record(1.0);
+        connectorStartupResults.record(0.0);
+    }
+
+    void recordConnectorStartupSuccess() {
+        connectorStartupAttempts.record(1.0);
+        connectorStartupSuccesses.record(1.0);
+        connectorStartupResults.record(1.0);
+    }
+
+    void recordTaskFailure() {
+        taskStartupAttempts.record(1.0);
+        taskStartupFailures.record(1.0);
+        taskStartupResults.record(0.0);
+    }
+
+    void recordTaskSuccess() {
+        taskStartupAttempts.record(1.0);
+        taskStartupSuccesses.record(1.0);
+        taskStartupResults.record(1.0);
+    }
+
+    protected ConnectMetrics.MetricGroup metricGroup() {
+        return metricGroup;
+    }
+
+    ConnectorStatus.Listener wrapStatusListener(ConnectorStatus.Listener delegateListener) {
+        return new ConnectorStatusListener(delegateListener);
+    }
+
+    TaskStatus.Listener wrapStatusListener(TaskStatus.Listener delegateListener) {
+        return new TaskStatusListener(delegateListener);
+    }
+
+    class ConnectorStatusListener implements ConnectorStatus.Listener {
+        private final ConnectorStatus.Listener delegateListener;
+        private volatile boolean startupSucceeded = false;
+
+        ConnectorStatusListener(ConnectorStatus.Listener delegateListener) {
+            this.delegateListener = delegateListener;
+        }
+
+        @Override
+        public void onShutdown(final String connector) {
+            delegateListener.onShutdown(connector);
+        }
+
+        @Override
+        public void onFailure(final String connector, final Throwable cause) {
+            if (!startupSucceeded) {
+                recordConnectorStartupFailure();
+            }
+            delegateListener.onFailure(connector, cause);
+        }
+
+        @Override
+        public void onPause(final String connector) {
+            delegateListener.onPause(connector);
+        }
+
+        @Override
+        public void onResume(final String connector) {
+            delegateListener.onResume(connector);
+        }
+
+        @Override
+        public void onStartup(final String connector) {
+            delegateListener.onStartup(connector);
+            startupSucceeded = true;
+            recordConnectorStartupSuccess();
+        }
+
+        @Override
+        public void onDeletion(final String connector) {
+            delegateListener.onDeletion(connector);
+        }
+    }
+
+    class TaskStatusListener implements TaskStatus.Listener {
+        private final TaskStatus.Listener delegatedListener;
+        private volatile boolean startupSucceeded = false;
+
+        TaskStatusListener(TaskStatus.Listener delegatedListener) {
+            this.delegatedListener = delegatedListener;
+        }
+
+        @Override
+        public void onStartup(final ConnectorTaskId id) {
+            recordTaskSuccess();
+            startupSucceeded = true;
+            delegatedListener.onStartup(id);

Review comment:
       Why is the order of these methods different than in `ConnectorStatusListener`? 
   
   Also, the `TaskStatusListener` methods always forward the method to the delegate _last_, whereas the methods of the `ConnectorStatusListener` use a mixture. Let's make them consistent.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
##########
@@ -151,6 +151,8 @@ public void removeMetrics() {
         taskMetricsGroup.close();
     }
 
+    protected abstract void initializeAndStart();

Review comment:
       I see how you've just pulled the `WorkerSinkTask.initializeAndStart()` method up to this abstract class (per @C0urante's suggestion. That is a nice clean way to encapsulate that logic into the base class and keep the metrics behavior hidden from the subclasses. Nicely done.
   
   It is a tiny bit unfortunate that the tests need to do something like:
   ```
           workerSourceTask.initialize(TASK_CONFIG);   // This just sets the config on the worker task
           workerSourceTask.initializeAndStart();            // This calls task.initialize(...) and task.start(...)
           workerSourceTask.execute();
   ```
   But the `initializeAndStart()` method in the `WorkerSinkTask` has been around since the beginning, and it's probably not worth changing here. After all, using `initializeAndStart()` still makes sense _within_ the `WorkerSinkTask` and now also the `WorkerTask` and `WorkerSourceTask` classes. So I'm fine with enlisting that method name as-is.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] michael-carter-instaclustr commented on pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
michael-carter-instaclustr commented on pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#issuecomment-883778218


   Awesome. Thanks @rhauch !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] michael-carter-instaclustr commented on a change in pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
michael-carter-instaclustr commented on a change in pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#discussion_r443935161



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
##########
@@ -56,7 +56,7 @@
     private static final String THREAD_NAME_PREFIX = "task-thread-";
 
     protected final ConnectorTaskId id;
-    private final TaskStatus.Listener statusListener;
+    protected final TaskStatus.Listener statusListener;

Review comment:
       Ah, of course! Yes that seems better, I'll do that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] michael-carter-instaclustr commented on a change in pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
michael-carter-instaclustr commented on a change in pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#discussion_r444715998



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerMetricsGroupTest.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.easymock.EasyMock.eq;
+import static org.powermock.api.easymock.PowerMock.createStrictMock;
+import static org.powermock.api.easymock.PowerMock.expectLastCall;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Sensor.class})
+public class WorkerMetricsGroupTest {
+    private final String connector = "org.FakeConnector";
+    private final ConnectorTaskId task = new ConnectorTaskId(connector, 0);
+    private final RuntimeException exception = new RuntimeException();
+
+    @Test
+    public void testConnectorStartupRecordedMetrics() {
+        final WorkerMetricsGroup mockWorkerMetricsGroup = createStrictMock(WorkerMetricsGroup.class);

Review comment:
       Yeah I started doing that. It got very messy as the constructor for WorkerMetricsGroup has a lot of dependencies. I'm happy to give it another go though.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] michael-carter-instaclustr commented on a change in pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

Posted by GitBox <gi...@apache.org>.
michael-carter-instaclustr commented on a change in pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#discussion_r443385491



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -289,7 +285,6 @@ public void startConnector(
                 // Can't be put in a finally block because it needs to be swapped before the call on
                 // statusListener
                 Plugins.compareAndSwapLoaders(savedLoader);
-                workerMetricsGroup.recordConnectorStartupFailure();

Review comment:
       Good point. I believe we could achieve the same effect by wrapping the statusListener earlier in the method and using the wrapped listener in the exception handler. That way the only way we could fail without recording a metric would be if the wrapping itself failed (which is a dead simple constructor, so seems unlikely)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org