You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2021/10/28 22:20:21 UTC

[kafka] branch 3.0 updated: KAFKA-9887 fix failed task or connector count on startup failure (#8844)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 7f67691  KAFKA-9887 fix failed task or connector count on startup failure (#8844)
7f67691 is described below

commit 7f6769110a52e07d0f406e188e570747eb941d8a
Author: Michael Carter <53...@users.noreply.github.com>
AuthorDate: Wed Jul 21 08:39:26 2021 +1000

    KAFKA-9887 fix failed task or connector count on startup failure (#8844)
    
    Moved the responsibility for recording task and connector startup and failure metrics from the invocation code
    into the status listener. The reason behind this is that the WorkerTasks (and subclasses) were either not propagating exceptions upwards, or were unable to do so easily because they were running on completely different threads.
    
    Also split out WorkerMetricsGroup from being an inner class into being a standard class. This was to make sure
    the Data Abstraction Count checkStyle rule was not violated.
    
    Author: Michael Carter <mi...@instaclustr.com>
    Reviewers: Chris Egerton <ch...@confluent.io>, Randall Hauch <rh...@gmail.com>
---
 .../org/apache/kafka/connect/runtime/Worker.java   | 105 +--------
 .../kafka/connect/runtime/WorkerMetricsGroup.java  | 214 ++++++++++++++++++
 .../kafka/connect/runtime/WorkerSinkTask.java      |   3 +-
 .../kafka/connect/runtime/WorkerSourceTask.java    |  25 ++-
 .../apache/kafka/connect/runtime/WorkerTask.java   |   6 +-
 .../connect/runtime/ErrorHandlingTaskTest.java     |   2 +
 .../connect/runtime/WorkerMetricsGroupTest.java    | 249 +++++++++++++++++++++
 .../kafka/connect/runtime/WorkerSinkTaskTest.java  |   3 +
 .../kafka/connect/runtime/WorkerTaskTest.java      |   8 +
 .../apache/kafka/connect/runtime/WorkerTest.java   |   6 +-
 10 files changed, 507 insertions(+), 114 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 73201b5..11af818 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -21,13 +21,9 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.MetricNameTemplate;
 import org.apache.kafka.common.config.ConfigValue;
 import org.apache.kafka.common.config.provider.ConfigProvider;
-import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.metrics.stats.CumulativeSum;
-import org.apache.kafka.common.metrics.stats.Frequencies;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.Connector;
@@ -59,8 +55,8 @@ import org.apache.kafka.connect.storage.OffsetBackingStore;
 import org.apache.kafka.connect.storage.OffsetStorageReader;
 import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
 import org.apache.kafka.connect.storage.OffsetStorageWriter;
-import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.LoggingContext;
 import org.apache.kafka.connect.util.SinkUtils;
@@ -147,7 +143,7 @@ public class Worker {
         this.plugins = plugins;
         this.config = config;
         this.connectorClientConfigOverridePolicy = connectorClientConfigOverridePolicy;
-        this.workerMetricsGroup = new WorkerMetricsGroup(metrics);
+        this.workerMetricsGroup = new WorkerMetricsGroup(this.connectors, this.tasks, metrics);
 
         Map<String, String> internalConverterConfig = Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false");
         this.internalKeyConverter = plugins.newInternalConverter(true, JsonConverter.class.getName(), internalConverterConfig);
@@ -247,6 +243,7 @@ public class Worker {
             TargetState initialState,
             Callback<TargetState> onConnectorStateChange
     ) {
+        final ConnectorStatus.Listener connectorStatusListener = workerMetricsGroup.wrapStatusListener(statusListener);
         try (LoggingContext loggingContext = LoggingContext.forConnector(connName)) {
             if (connectors.containsKey(connName)) {
                 onConnectorStateChange.onCompletion(
@@ -274,7 +271,7 @@ public class Worker {
                 final OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(
                         offsetBackingStore, connName, internalKeyConverter, internalValueConverter);
                 workerConnector = new WorkerConnector(
-                        connName, connector, connConfig, ctx, metrics, statusListener, offsetReader, connectorLoader);
+                        connName, connector, connConfig, ctx, metrics, connectorStatusListener, offsetReader, connectorLoader);
                 log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connector.getClass());
                 workerConnector.transitionTo(initialState, onConnectorStateChange);
                 Plugins.compareAndSwapLoaders(savedLoader);
@@ -283,8 +280,7 @@ public class Worker {
                 // Can't be put in a finally block because it needs to be swapped before the call on
                 // statusListener
                 Plugins.compareAndSwapLoaders(savedLoader);
-                workerMetricsGroup.recordConnectorStartupFailure();
-                statusListener.onFailure(connName, t);
+                connectorStatusListener.onFailure(connName, t);
                 onConnectorStateChange.onCompletion(t, null);
                 return;
             }
@@ -302,7 +298,6 @@ public class Worker {
             executor.submit(workerConnector);
 
             log.info("Finished creating connector {}", connName);
-            workerMetricsGroup.recordConnectorStartupSuccess();
         }
     }
 
@@ -499,6 +494,7 @@ public class Worker {
             TargetState initialState
     ) {
         final WorkerTask workerTask;
+        final TaskStatus.Listener taskStatusListener = workerMetricsGroup.wrapStatusListener(statusListener);
         try (LoggingContext loggingContext = LoggingContext.forTask(id)) {
             log.info("Creating task {}", id);
 
@@ -546,8 +542,8 @@ public class Worker {
                     log.info("Set up the header converter {} for task {} using the connector config", headerConverter.getClass(), id);
                 }
 
-                workerTask = buildWorkerTask(configState, connConfig, id, task, statusListener, initialState, keyConverter, valueConverter,
-                                             headerConverter, connectorLoader);
+                workerTask = buildWorkerTask(configState, connConfig, id, task, taskStatusListener,
+                        initialState, keyConverter, valueConverter, headerConverter, connectorLoader);
                 workerTask.initialize(taskConfig);
                 Plugins.compareAndSwapLoaders(savedLoader);
             } catch (Throwable t) {
@@ -556,8 +552,7 @@ public class Worker {
                 // statusListener
                 Plugins.compareAndSwapLoaders(savedLoader);
                 connectorStatusMetricsGroup.recordTaskRemoved(id);
-                workerMetricsGroup.recordTaskFailure();
-                statusListener.onFailure(id, t);
+                taskStatusListener.onFailure(id, t);
                 return false;
             }
 
@@ -569,7 +564,6 @@ public class Worker {
             if (workerTask instanceof WorkerSourceTask) {
                 sourceTaskOffsetCommitter.schedule(id, (WorkerSourceTask) workerTask);
             }
-            workerMetricsGroup.recordTaskSuccess();
             return true;
         }
     }
@@ -1056,85 +1050,4 @@ public class Worker {
         }
     }
 
-    class WorkerMetricsGroup {
-        private final MetricGroup metricGroup;
-        private final Sensor connectorStartupAttempts;
-        private final Sensor connectorStartupSuccesses;
-        private final Sensor connectorStartupFailures;
-        private final Sensor connectorStartupResults;
-        private final Sensor taskStartupAttempts;
-        private final Sensor taskStartupSuccesses;
-        private final Sensor taskStartupFailures;
-        private final Sensor taskStartupResults;
-
-        public WorkerMetricsGroup(ConnectMetrics connectMetrics) {
-            ConnectMetricsRegistry registry = connectMetrics.registry();
-            metricGroup = connectMetrics.group(registry.workerGroupName());
-
-            metricGroup.addValueMetric(registry.connectorCount, now -> (double) connectors.size());
-            metricGroup.addValueMetric(registry.taskCount, now -> (double) tasks.size());
-
-            MetricName connectorFailurePct = metricGroup.metricName(registry.connectorStartupFailurePercentage);
-            MetricName connectorSuccessPct = metricGroup.metricName(registry.connectorStartupSuccessPercentage);
-            Frequencies connectorStartupResultFrequencies = Frequencies.forBooleanValues(connectorFailurePct, connectorSuccessPct);
-            connectorStartupResults = metricGroup.sensor("connector-startup-results");
-            connectorStartupResults.add(connectorStartupResultFrequencies);
-
-            connectorStartupAttempts = metricGroup.sensor("connector-startup-attempts");
-            connectorStartupAttempts.add(metricGroup.metricName(registry.connectorStartupAttemptsTotal), new CumulativeSum());
-
-            connectorStartupSuccesses = metricGroup.sensor("connector-startup-successes");
-            connectorStartupSuccesses.add(metricGroup.metricName(registry.connectorStartupSuccessTotal), new CumulativeSum());
-
-            connectorStartupFailures = metricGroup.sensor("connector-startup-failures");
-            connectorStartupFailures.add(metricGroup.metricName(registry.connectorStartupFailureTotal), new CumulativeSum());
-
-            MetricName taskFailurePct = metricGroup.metricName(registry.taskStartupFailurePercentage);
-            MetricName taskSuccessPct = metricGroup.metricName(registry.taskStartupSuccessPercentage);
-            Frequencies taskStartupResultFrequencies = Frequencies.forBooleanValues(taskFailurePct, taskSuccessPct);
-            taskStartupResults = metricGroup.sensor("task-startup-results");
-            taskStartupResults.add(taskStartupResultFrequencies);
-
-            taskStartupAttempts = metricGroup.sensor("task-startup-attempts");
-            taskStartupAttempts.add(metricGroup.metricName(registry.taskStartupAttemptsTotal), new CumulativeSum());
-
-            taskStartupSuccesses = metricGroup.sensor("task-startup-successes");
-            taskStartupSuccesses.add(metricGroup.metricName(registry.taskStartupSuccessTotal), new CumulativeSum());
-
-            taskStartupFailures = metricGroup.sensor("task-startup-failures");
-            taskStartupFailures.add(metricGroup.metricName(registry.taskStartupFailureTotal), new CumulativeSum());
-        }
-
-        void close() {
-            metricGroup.close();
-        }
-
-        void recordConnectorStartupFailure() {
-            connectorStartupAttempts.record(1.0);
-            connectorStartupFailures.record(1.0);
-            connectorStartupResults.record(0.0);
-        }
-
-        void recordConnectorStartupSuccess() {
-            connectorStartupAttempts.record(1.0);
-            connectorStartupSuccesses.record(1.0);
-            connectorStartupResults.record(1.0);
-        }
-
-        void recordTaskFailure() {
-            taskStartupAttempts.record(1.0);
-            taskStartupFailures.record(1.0);
-            taskStartupResults.record(0.0);
-        }
-
-        void recordTaskSuccess() {
-            taskStartupAttempts.record(1.0);
-            taskStartupSuccesses.record(1.0);
-            taskStartupResults.record(1.0);
-        }
-
-        protected MetricGroup metricGroup() {
-            return metricGroup;
-        }
-    }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerMetricsGroup.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerMetricsGroup.java
new file mode 100644
index 0000000..f03bc4f
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerMetricsGroup.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+import java.util.Map;
+
+class WorkerMetricsGroup {
+    private final ConnectMetrics.MetricGroup metricGroup;
+    private final Sensor connectorStartupAttempts;
+    private final Sensor connectorStartupSuccesses;
+    private final Sensor connectorStartupFailures;
+    private final Sensor connectorStartupResults;
+    private final Sensor taskStartupAttempts;
+    private final Sensor taskStartupSuccesses;
+    private final Sensor taskStartupFailures;
+    private final Sensor taskStartupResults;
+
+    public WorkerMetricsGroup(final Map<String, WorkerConnector> connectors, Map<ConnectorTaskId, WorkerTask> tasks, ConnectMetrics connectMetrics) {
+        ConnectMetricsRegistry registry = connectMetrics.registry();
+        metricGroup = connectMetrics.group(registry.workerGroupName());
+
+        metricGroup.addValueMetric(registry.connectorCount, now -> (double) connectors.size());
+        metricGroup.addValueMetric(registry.taskCount, now -> (double) tasks.size());
+
+        MetricName connectorFailurePct = metricGroup.metricName(registry.connectorStartupFailurePercentage);
+        MetricName connectorSuccessPct = metricGroup.metricName(registry.connectorStartupSuccessPercentage);
+        Frequencies connectorStartupResultFrequencies = Frequencies.forBooleanValues(connectorFailurePct, connectorSuccessPct);
+        connectorStartupResults = metricGroup.sensor("connector-startup-results");
+        connectorStartupResults.add(connectorStartupResultFrequencies);
+
+        connectorStartupAttempts = metricGroup.sensor("connector-startup-attempts");
+        connectorStartupAttempts.add(metricGroup.metricName(registry.connectorStartupAttemptsTotal), new CumulativeSum());
+
+        connectorStartupSuccesses = metricGroup.sensor("connector-startup-successes");
+        connectorStartupSuccesses.add(metricGroup.metricName(registry.connectorStartupSuccessTotal), new CumulativeSum());
+
+        connectorStartupFailures = metricGroup.sensor("connector-startup-failures");
+        connectorStartupFailures.add(metricGroup.metricName(registry.connectorStartupFailureTotal), new CumulativeSum());
+
+        MetricName taskFailurePct = metricGroup.metricName(registry.taskStartupFailurePercentage);
+        MetricName taskSuccessPct = metricGroup.metricName(registry.taskStartupSuccessPercentage);
+        Frequencies taskStartupResultFrequencies = Frequencies.forBooleanValues(taskFailurePct, taskSuccessPct);
+        taskStartupResults = metricGroup.sensor("task-startup-results");
+        taskStartupResults.add(taskStartupResultFrequencies);
+
+        taskStartupAttempts = metricGroup.sensor("task-startup-attempts");
+        taskStartupAttempts.add(metricGroup.metricName(registry.taskStartupAttemptsTotal), new CumulativeSum());
+
+        taskStartupSuccesses = metricGroup.sensor("task-startup-successes");
+        taskStartupSuccesses.add(metricGroup.metricName(registry.taskStartupSuccessTotal), new CumulativeSum());
+
+        taskStartupFailures = metricGroup.sensor("task-startup-failures");
+        taskStartupFailures.add(metricGroup.metricName(registry.taskStartupFailureTotal), new CumulativeSum());
+    }
+
+    void close() {
+        metricGroup.close();
+    }
+
+    void recordConnectorStartupFailure() {
+        connectorStartupAttempts.record(1.0);
+        connectorStartupFailures.record(1.0);
+        connectorStartupResults.record(0.0);
+    }
+
+    void recordConnectorStartupSuccess() {
+        connectorStartupAttempts.record(1.0);
+        connectorStartupSuccesses.record(1.0);
+        connectorStartupResults.record(1.0);
+    }
+
+    void recordTaskFailure() {
+        taskStartupAttempts.record(1.0);
+        taskStartupFailures.record(1.0);
+        taskStartupResults.record(0.0);
+    }
+
+    void recordTaskSuccess() {
+        taskStartupAttempts.record(1.0);
+        taskStartupSuccesses.record(1.0);
+        taskStartupResults.record(1.0);
+    }
+
+    protected ConnectMetrics.MetricGroup metricGroup() {
+        return metricGroup;
+    }
+
+    ConnectorStatus.Listener wrapStatusListener(ConnectorStatus.Listener delegateListener) {
+        return new ConnectorStatusListener(delegateListener);
+    }
+
+    TaskStatus.Listener wrapStatusListener(TaskStatus.Listener delegateListener) {
+        return new TaskStatusListener(delegateListener);
+    }
+
+    class ConnectorStatusListener implements ConnectorStatus.Listener {
+        private final ConnectorStatus.Listener delegateListener;
+        private volatile boolean startupSucceeded = false;
+
+        ConnectorStatusListener(ConnectorStatus.Listener delegateListener) {
+            this.delegateListener = delegateListener;
+        }
+
+        @Override
+        public void onStartup(final String connector) {
+            startupSucceeded = true;
+            recordConnectorStartupSuccess();
+            delegateListener.onStartup(connector);
+        }
+
+        @Override
+        public void onPause(final String connector) {
+            delegateListener.onPause(connector);
+        }
+
+        @Override
+        public void onResume(final String connector) {
+            delegateListener.onResume(connector);
+        }
+
+        @Override
+        public void onFailure(final String connector, final Throwable cause) {
+            if (!startupSucceeded) {
+                recordConnectorStartupFailure();
+            }
+            delegateListener.onFailure(connector, cause);
+        }
+
+        @Override
+        public void onRestart(String connector) {
+            delegateListener.onRestart(connector);
+        }
+
+        @Override
+        public void onShutdown(final String connector) {
+            delegateListener.onShutdown(connector);
+        }
+
+        @Override
+        public void onDeletion(final String connector) {
+            delegateListener.onDeletion(connector);
+        }
+    }
+
+    class TaskStatusListener implements TaskStatus.Listener {
+        private final TaskStatus.Listener delegatedListener;
+        private volatile boolean startupSucceeded = false;
+
+        TaskStatusListener(TaskStatus.Listener delegatedListener) {
+            this.delegatedListener = delegatedListener;
+        }
+
+        @Override
+        public void onStartup(final ConnectorTaskId id) {
+            recordTaskSuccess();
+            startupSucceeded = true;
+            delegatedListener.onStartup(id);
+        }
+
+        @Override
+        public void onPause(final ConnectorTaskId id) {
+            delegatedListener.onPause(id);
+        }
+
+        @Override
+        public void onResume(final ConnectorTaskId id) {
+            delegatedListener.onResume(id);
+        }
+
+        @Override
+        public void onFailure(final ConnectorTaskId id, final Throwable cause) {
+            if (!startupSucceeded) {
+                recordTaskFailure();
+            }
+            delegatedListener.onFailure(id, cause);
+        }
+
+        @Override
+        public void onRestart(ConnectorTaskId id) {
+            delegatedListener.onRestart(id);
+        }
+
+        @Override
+        public void onShutdown(final ConnectorTaskId id) {
+            delegatedListener.onShutdown(id);
+        }
+
+        @Override
+        public void onDeletion(final ConnectorTaskId id) {
+            delegatedListener.onDeletion(id);
+        }
+    }
+
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 1229fec..312747b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -193,7 +193,7 @@ class WorkerSinkTask extends WorkerTask {
 
     @Override
     public void execute() {
-        initializeAndStart();
+        log.info("{} Executing sink task", this);
         // Make sure any uncommitted data has been committed and the task has
         // a chance to clean up its state
         try (UncheckedCloseable suppressible = this::closePartitions) {
@@ -290,6 +290,7 @@ class WorkerSinkTask extends WorkerTask {
     /**
      * Initializes and starts the SinkTask.
      */
+    @Override
     protected void initializeAndStart() {
         SinkConnectorConfig.validate(taskConfig);
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index c66e0fc..7307ec6 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -219,18 +219,23 @@ class WorkerSourceTask extends WorkerTask {
     }
 
     @Override
+    protected void initializeAndStart() {
+        // If we try to start the task at all by invoking initialize, then count this as
+        // "started" and expect a subsequent call to the task's stop() method
+        // to properly clean up any resources allocated by its initialize() or
+        // start() methods. If the task throws an exception during stop(),
+        // the worst thing that happens is another exception gets logged for an already-
+        // failed task
+        started = true;
+        task.initialize(new WorkerSourceTaskContext(offsetReader, this, configState));
+        task.start(taskConfig);
+        log.info("{} Source task finished initialization and start", this);
+    }
+
+    @Override
     public void execute() {
         try {
-            // If we try to start the task at all by invoking initialize, then count this as
-            // "started" and expect a subsequent call to the task's stop() method
-            // to properly clean up any resources allocated by its initialize() or 
-            // start() methods. If the task throws an exception during stop(),
-            // the worst thing that happens is another exception gets logged for an already-
-            // failed task
-            started = true;
-            task.initialize(new WorkerSourceTaskContext(offsetReader, this, configState));
-            task.start(taskConfig);
-            log.info("{} Source task finished initialization and start", this);
+            log.info("{} Executing source task", this);
             while (!isStopping()) {
                 if (shouldPause()) {
                     onPause();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index 58fe061..e41b061 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -148,6 +148,8 @@ abstract class WorkerTask implements Runnable {
         taskMetricsGroup.close();
     }
 
+    protected abstract void initializeAndStart();
+
     protected abstract void execute();
 
     protected abstract void close();
@@ -179,10 +181,10 @@ abstract class WorkerTask implements Runnable {
                     onPause();
                     if (!awaitUnpause()) return;
                 }
-
-                statusListener.onStartup(id);
             }
 
+            initializeAndStart();
+            statusListener.onStartup(id);
             execute();
         } catch (Throwable t) {
             if (cancelled) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index c6f79fe..b0cd0b4 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -394,6 +394,7 @@ public class ErrorHandlingTaskTest {
         PowerMock.replayAll();
 
         workerSourceTask.initialize(TASK_CONFIG);
+        workerSourceTask.initializeAndStart();
         workerSourceTask.execute();
 
         // two records were consumed from Kafka
@@ -459,6 +460,7 @@ public class ErrorHandlingTaskTest {
         PowerMock.replayAll();
 
         workerSourceTask.initialize(TASK_CONFIG);
+        workerSourceTask.initializeAndStart();
         workerSourceTask.execute();
 
         // two records were consumed from Kafka
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerMetricsGroupTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerMetricsGroupTest.java
new file mode 100644
index 0000000..2eb20ce
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerMetricsGroupTest.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.metrics.CompoundStat;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.HashMap;
+
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.anyString;
+import static org.easymock.EasyMock.eq;
+import static org.powermock.api.easymock.PowerMock.expectLastCall;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Sensor.class, MetricName.class})
+public class WorkerMetricsGroupTest {
+    private final String connector = "org.FakeConnector";
+    private final ConnectorTaskId task = new ConnectorTaskId(connector, 0);
+    private final RuntimeException exception = new RuntimeException();
+
+    private ConnectMetrics connectMetrics;
+    
+    private Sensor connectorStartupResults;
+    private Sensor connectorStartupAttempts;
+    private Sensor connectorStartupSuccesses;
+    private Sensor connectorStartupFailures;
+
+    private Sensor taskStartupResults;
+    private Sensor taskStartupAttempts;
+    private Sensor taskStartupSuccesses;
+    private Sensor taskStartupFailures;
+
+    private ConnectorStatus.Listener delegateConnectorListener;
+    private TaskStatus.Listener delegateTaskListener;
+
+    @Before
+    public void setup() {
+        connectMetrics = PowerMock.createMock(ConnectMetrics.class);
+        ConnectMetricsRegistry connectMetricsRegistry = PowerMock.createNiceMock(ConnectMetricsRegistry.class);
+        ConnectMetrics.MetricGroup metricGroup = PowerMock.createNiceMock(ConnectMetrics.MetricGroup.class);
+
+        connectMetrics.registry();
+        expectLastCall().andReturn(connectMetricsRegistry);
+
+        connectMetrics.group(anyString());
+        expectLastCall().andReturn(metricGroup);
+
+        MetricName metricName = PowerMock.createMock(MetricName.class);
+        metricGroup.metricName(anyObject(MetricNameTemplate.class));
+        expectLastCall().andStubReturn(metricName);
+
+        connectorStartupResults = mockSensor(metricGroup, "connector-startup-results");
+        connectorStartupAttempts = mockSensor(metricGroup, "connector-startup-attempts");
+        connectorStartupSuccesses = mockSensor(metricGroup, "connector-startup-successes");
+        connectorStartupFailures = mockSensor(metricGroup, "connector-startup-failures");
+
+        taskStartupResults = mockSensor(metricGroup, "task-startup-results");
+        taskStartupAttempts = mockSensor(metricGroup, "task-startup-attempts");
+        taskStartupSuccesses = mockSensor(metricGroup, "task-startup-successes");
+        taskStartupFailures = mockSensor(metricGroup, "task-startup-failures");
+
+        delegateConnectorListener = PowerMock.createStrictMock(ConnectorStatus.Listener.class);
+        delegateTaskListener = PowerMock.createStrictMock(TaskStatus.Listener.class);
+    }
+
+    private Sensor mockSensor(ConnectMetrics.MetricGroup metricGroup, String name) {
+        Sensor sensor = PowerMock.createMock(Sensor.class);
+        metricGroup.sensor(eq(name));
+        expectLastCall().andReturn(sensor);
+
+        sensor.add(anyObject(CompoundStat.class));
+        expectLastCall().andStubReturn(true);
+
+        sensor.add(anyObject(MetricName.class), anyObject(CumulativeSum.class));
+        expectLastCall().andStubReturn(true);
+
+        return sensor;
+    }
+    
+    @Test
+    public void testConnectorStartupRecordedMetrics() {
+        delegateConnectorListener.onStartup(eq(connector));
+        expectLastCall();
+
+        connectorStartupAttempts.record(eq(1.0));
+        expectLastCall();
+        connectorStartupSuccesses.record(eq(1.0));
+        expectLastCall();
+        connectorStartupResults.record(eq(1.0));
+        expectLastCall();
+
+        PowerMock.replayAll();
+
+        WorkerMetricsGroup workerMetricsGroup = new WorkerMetricsGroup(new HashMap<>(), new HashMap<>(), connectMetrics);
+        final ConnectorStatus.Listener connectorListener = workerMetricsGroup.wrapStatusListener(delegateConnectorListener);
+
+        connectorListener.onStartup(connector);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testConnectorFailureAfterStartupRecordedMetrics() {
+        delegateConnectorListener.onStartup(eq(connector));
+        expectLastCall();
+
+        connectorStartupAttempts.record(eq(1.0));
+        expectLastCall();
+        connectorStartupSuccesses.record(eq(1.0));
+        expectLastCall();
+        connectorStartupResults.record(eq(1.0));
+        expectLastCall();
+        
+        delegateConnectorListener.onFailure(eq(connector), eq(exception));
+        expectLastCall();
+
+        // recordConnectorStartupFailure() should not be called if failure happens after a successful startup
+
+        PowerMock.replayAll();
+
+        WorkerMetricsGroup workerMetricsGroup = new WorkerMetricsGroup(new HashMap<>(), new HashMap<>(), connectMetrics);
+        final ConnectorStatus.Listener connectorListener = workerMetricsGroup.wrapStatusListener(delegateConnectorListener);
+
+        connectorListener.onStartup(connector);
+        connectorListener.onFailure(connector, exception);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testConnectorFailureBeforeStartupRecordedMetrics() {
+        delegateConnectorListener.onFailure(eq(connector), eq(exception));
+        expectLastCall();
+
+        connectorStartupAttempts.record(eq(1.0));
+        expectLastCall();
+        connectorStartupFailures.record(eq(1.0));
+        expectLastCall();
+        connectorStartupResults.record(eq(0.0));
+        expectLastCall();
+        
+        PowerMock.replayAll();
+
+        WorkerMetricsGroup workerMetricsGroup = new WorkerMetricsGroup(new HashMap<>(), new HashMap<>(), connectMetrics);
+        final ConnectorStatus.Listener connectorListener = workerMetricsGroup.wrapStatusListener(delegateConnectorListener);
+        
+        connectorListener.onFailure(connector, exception);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testTaskStartupRecordedMetrics() {
+        delegateTaskListener.onStartup(eq(task));
+        expectLastCall();
+
+        taskStartupAttempts.record(eq(1.0));
+        expectLastCall();
+        taskStartupSuccesses.record(eq(1.0));
+        expectLastCall();
+        taskStartupResults.record(eq(1.0));
+        expectLastCall();
+
+        PowerMock.replayAll();
+
+        WorkerMetricsGroup workerMetricsGroup = new WorkerMetricsGroup(new HashMap<>(), new HashMap<>(), connectMetrics);
+        final TaskStatus.Listener taskListener = workerMetricsGroup.wrapStatusListener(delegateTaskListener);
+
+        taskListener.onStartup(task);
+
+        PowerMock.verifyAll();
+    }
+    
+    @Test
+    public void testTaskFailureAfterStartupRecordedMetrics() {
+        delegateTaskListener.onStartup(eq(task));
+        expectLastCall();
+
+        taskStartupAttempts.record(eq(1.0));
+        expectLastCall();
+        taskStartupSuccesses.record(eq(1.0));
+        expectLastCall();
+        taskStartupResults.record(eq(1.0));
+        expectLastCall();
+
+        delegateTaskListener.onFailure(eq(task), eq(exception));
+        expectLastCall();
+
+        // recordTaskFailure() should not be called if failure happens after a successful startup
+
+        PowerMock.replayAll();
+
+        WorkerMetricsGroup workerMetricsGroup = new WorkerMetricsGroup(new HashMap<>(), new HashMap<>(), connectMetrics);
+        final TaskStatus.Listener taskListener = workerMetricsGroup.wrapStatusListener(delegateTaskListener);
+
+        taskListener.onStartup(task);
+        taskListener.onFailure(task, exception);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testTaskFailureBeforeStartupRecordedMetrics() {
+        delegateTaskListener.onFailure(eq(task), eq(exception));
+        expectLastCall();
+
+        taskStartupAttempts.record(eq(1.0));
+        expectLastCall();
+        taskStartupFailures.record(eq(1.0));
+        expectLastCall();
+        taskStartupResults.record(eq(0.0));
+        expectLastCall();
+
+        PowerMock.replayAll();
+
+        WorkerMetricsGroup workerMetricsGroup = new WorkerMetricsGroup(new HashMap<>(), new HashMap<>(), connectMetrics);
+        final TaskStatus.Listener taskListener = workerMetricsGroup.wrapStatusListener(delegateTaskListener);
+
+        taskListener.onFailure(task, exception);
+
+        PowerMock.verifyAll();
+    }
+
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 651ef5b..7a2a6e4 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -620,6 +620,7 @@ public class WorkerSinkTaskTest {
         PowerMock.replayAll();
 
         workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
         workerTask.execute();
 
         assertEquals(0, workerTask.commitFailures());
@@ -971,6 +972,7 @@ public class WorkerSinkTaskTest {
         PowerMock.replayAll();
 
         workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
         try {
             workerTask.execute();
             fail("workerTask.execute should have thrown an exception");
@@ -1010,6 +1012,7 @@ public class WorkerSinkTaskTest {
         PowerMock.replayAll();
 
         workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
         try {
             workerTask.execute();
             fail("workerTask.execute should have thrown an exception");
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
index 81912f2..890c0f7 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
@@ -92,6 +92,7 @@ public class WorkerTaskTest {
                 .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics,
                         retryWithToleranceOperator, Time.SYSTEM, statusBackingStore)
                 .addMockedMethod("initialize")
+                .addMockedMethod("initializeAndStart")
                 .addMockedMethod("execute")
                 .addMockedMethod("close")
                 .createStrictMock();
@@ -99,6 +100,9 @@ public class WorkerTaskTest {
         workerTask.initialize(TASK_CONFIG);
         expectLastCall();
 
+        workerTask.initializeAndStart();
+        expectLastCall();
+
         workerTask.execute();
         expectLastCall();
 
@@ -179,6 +183,7 @@ public class WorkerTaskTest {
                 .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics,
                         retryWithToleranceOperator, Time.SYSTEM, statusBackingStore)
                 .addMockedMethod("initialize")
+                .addMockedMethod("initializeAndStart")
                 .addMockedMethod("execute")
                 .addMockedMethod("close")
                 .createStrictMock();
@@ -194,6 +199,9 @@ public class WorkerTaskTest {
         workerTask.initialize(TASK_CONFIG);
         EasyMock.expectLastCall();
 
+        workerTask.initializeAndStart();
+        EasyMock.expectLastCall();
+
         workerTask.execute();
         expectLastCall().andAnswer(() -> {
             thread.start();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index fe89745..046a9d9 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -26,9 +26,9 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.provider.MockFileConfigProvider;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.metrics.stats.Avg;
-import org.apache.kafka.common.config.provider.MockFileConfigProvider;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.connector.ConnectorContext;
@@ -692,20 +692,16 @@ public class WorkerTest extends ThreadedTest {
         worker.herder = herder;
         worker.start();
         assertStatistics(worker, 0, 0);
-        assertStartupStatistics(worker, 0, 0, 0, 0);
         assertEquals(Collections.emptySet(), worker.taskIds());
         worker.startTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED);
         assertStatistics(worker, 0, 1);
-        assertStartupStatistics(worker, 0, 0, 1, 0);
         assertEquals(new HashSet<>(Arrays.asList(TASK_ID)), worker.taskIds());
         worker.stopAndAwaitTask(TASK_ID);
         assertStatistics(worker, 0, 0);
-        assertStartupStatistics(worker, 0, 0, 1, 0);
         assertEquals(Collections.emptySet(), worker.taskIds());
         // Nothing should be left, so this should effectively be a nop
         worker.stop();
         assertStatistics(worker, 0, 0);
-        assertStartupStatistics(worker, 0, 0, 1, 0);
 
         PowerMock.verifyAll();
     }