You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2017/10/05 18:23:14 UTC

[2/2] kafka git commit: KAFKA-5903: Added Connect metrics to the worker and distributed herder (KIP-196)

KAFKA-5903: Added Connect metrics to the worker and distributed herder (KIP-196)

Added metrics to the Connect worker and rebalancing metrics to the distributed herder.

This is built on top of #3987, and I can rebase this PR once that is merged.

Author: Randall Hauch <rh...@gmail.com>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #4011 from rhauch/kafka-5903


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a47bfbca
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a47bfbca
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a47bfbca

Branch: refs/heads/trunk
Commit: a47bfbcae050659d32f777ed2f4b26dda5fbdbbd
Parents: ff5fc9d
Author: Randall Hauch <rh...@gmail.com>
Authored: Thu Oct 5 11:23:11 2017 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Thu Oct 5 11:23:11 2017 -0700

----------------------------------------------------------------------
 .../kafka/connect/runtime/ConnectMetrics.java   |  61 +++++++---
 .../connect/runtime/ConnectMetricsRegistry.java | 121 ++++++++++++++-----
 .../apache/kafka/connect/runtime/Worker.java    | 111 +++++++++++++++++
 .../kafka/connect/runtime/WorkerConnector.java  |  37 ++++--
 .../kafka/connect/runtime/WorkerTask.java       |  23 ++--
 .../runtime/distributed/DistributedHerder.java  |  87 ++++++++++++-
 .../connect/runtime/MockConnectMetrics.java     |  82 +++++++++++--
 .../connect/runtime/WorkerConnectorTest.java    |  44 ++++++-
 .../connect/runtime/WorkerSinkTaskTest.java     |  54 ++++-----
 .../connect/runtime/WorkerSourceTaskTest.java   |  32 ++---
 .../kafka/connect/runtime/WorkerTaskTest.java   |   4 +-
 .../kafka/connect/runtime/WorkerTest.java       | 101 ++++++++++++++--
 .../distributed/DistributedHerderTest.java      |  92 +++++++++++++-
 13 files changed, 704 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a47bfbca/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
index 974967a..3cd1eae 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
@@ -19,8 +19,8 @@ package org.apache.kafka.connect.runtime;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.metrics.Gauge;
 import org.apache.kafka.common.metrics.JmxReporter;
-import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
@@ -68,9 +68,11 @@ public class ConnectMetrics {
         this.time = time;
 
         MetricConfig metricConfig = new MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
-                                            .timeWindow(config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
-                                            .recordLevel(Sensor.RecordingLevel.forName(config.getString(CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG)));
-        List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
+                                                      .timeWindow(config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG),
+                                                                  TimeUnit.MILLISECONDS).recordLevel(
+                        Sensor.RecordingLevel.forName(config.getString(CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG)));
+        List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
+                                                                        MetricsReporter.class);
         reporters.add(new JmxReporter(JMX_PREFIX));
         this.metrics = new Metrics(metricConfig, reporters, time);
         LOG.debug("Registering Connect metrics with JMX for worker '{}'", workerId);
@@ -121,7 +123,8 @@ public class ConnectMetrics {
         if (group == null) {
             group = new MetricGroup(groupId);
             MetricGroup previous = groupsByName.putIfAbsent(groupId, group);
-            if (previous != null) group = previous;
+            if (previous != null)
+                group = previous;
         }
         return group;
     }
@@ -204,7 +207,8 @@ public class ConnectMetrics {
 
         @Override
         public boolean equals(Object obj) {
-            if (obj == this) return true;
+            if (obj == this)
+                return true;
             if (obj instanceof MetricGroupId) {
                 MetricGroupId that = (MetricGroupId) obj;
                 return this.groupName.equals(that.groupName) && this.tags.equals(that.tags);
@@ -290,19 +294,38 @@ public class ConnectMetrics {
         }
 
         /**
-         * Add to this group an indicator metric with a function that will be used to obtain the indicator state.
+         * Add to this group an indicator metric with a function that returns the current value.
          *
          * @param nameTemplate the name template for the metric; may not be null
-         * @param predicate    the predicate function used to determine the indicator state; may not be null
+         * @param supplier     the function used to determine the literal value of the metric; may not be null
          * @throws IllegalArgumentException if the name is not valid
          */
-        public void addIndicatorMetric(MetricNameTemplate nameTemplate, final IndicatorPredicate predicate) {
+        public <T> void addValueMetric(MetricNameTemplate nameTemplate, final LiteralSupplier<T> supplier) {
             MetricName metricName = metricName(nameTemplate);
             if (metrics().metric(metricName) == null) {
-                metrics().addMetric(metricName, new Measurable() {
+                metrics().addMetric(metricName, new Gauge<T>() {
                     @Override
-                    public double measure(MetricConfig config, long now) {
-                        return predicate.matches() ? 1.0d : 0.0d;
+                    public T value(MetricConfig config, long now) {
+                        return supplier.metricValue(now);
+                    }
+                });
+            }
+        }
+
+        /**
+         * Add to this group an indicator metric that always returns the specified value.
+         *
+         * @param nameTemplate the name template for the metric; may not be null
+         * @param value        the value; may not be null
+         * @throws IllegalArgumentException if the name is not valid
+         */
+        public <T> void addImmutableValueMetric(MetricNameTemplate nameTemplate, final T value) {
+            MetricName metricName = metricName(nameTemplate);
+            if (metrics().metric(metricName) == null) {
+                metrics().addMetric(metricName, new Gauge<T>() {
+                    @Override
+                    public T value(MetricConfig config, long now) {
+                        return value;
                     }
                 });
             }
@@ -369,7 +392,8 @@ public class ConnectMetrics {
         public synchronized Sensor sensor(String name, MetricConfig config, Sensor.RecordingLevel recordingLevel, Sensor... parents) {
             // We need to make sure that all sensor names are unique across all groups, so use the sensor prefix
             Sensor result = metrics.sensor(sensorPrefix + name, config, Long.MAX_VALUE, recordingLevel, parents);
-            if (result != null) sensorNames.add(result.name());
+            if (result != null)
+                sensorNames.add(result.name());
             return result;
         }
 
@@ -390,16 +414,17 @@ public class ConnectMetrics {
     }
 
     /**
-     * A simple functional interface that determines whether an indicator metric is true.
+     * A simple functional interface that returns a literal value.
      */
-    public interface IndicatorPredicate {
+    public interface LiteralSupplier<T> {
 
         /**
-         * Return whether the indicator metric is true.
+         * Return the literal value for the metric.
          *
-         * @return true if the indicator metric is satisfied, or false otherwise
+         * @param now the current time in milliseconds
+         * @return the literal metric value; may not be null
          */
-        boolean matches();
+        T metricValue(long now);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/a47bfbca/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
index ee513c9..d78576e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
@@ -32,16 +32,15 @@ public class ConnectMetricsRegistry {
     public static final String TASK_GROUP_NAME = "connector-task-metrics";
     public static final String SOURCE_TASK_GROUP_NAME = "source-task-metrics";
     public static final String SINK_TASK_GROUP_NAME = "sink-task-metrics";
+    public static final String WORKER_GROUP_NAME = "connect-worker-metrics";
+    public static final String WORKER_REBALANCE_GROUP_NAME = "connect-worker-rebalance-metrics";
 
     private final List<MetricNameTemplate> allTemplates = new ArrayList<>();
-    public final MetricNameTemplate connectorStatusRunning;
-    public final MetricNameTemplate connectorStatusPaused;
-    public final MetricNameTemplate connectorStatusFailed;
-    public final MetricNameTemplate taskStatusUnassigned;
-    public final MetricNameTemplate taskStatusRunning;
-    public final MetricNameTemplate taskStatusPaused;
-    public final MetricNameTemplate taskStatusFailed;
-    public final MetricNameTemplate taskStatusDestroyed;
+    public final MetricNameTemplate connectorStatus;
+    public final MetricNameTemplate connectorType;
+    public final MetricNameTemplate connectorClass;
+    public final MetricNameTemplate connectorVersion;
+    public final MetricNameTemplate taskStatus;
     public final MetricNameTemplate taskRunningRatio;
     public final MetricNameTemplate taskPauseRatio;
     public final MetricNameTemplate taskCommitTimeMax;
@@ -75,6 +74,25 @@ public class ConnectMetricsRegistry {
     public final MetricNameTemplate sinkRecordActiveCount;
     public final MetricNameTemplate sinkRecordActiveCountMax;
     public final MetricNameTemplate sinkRecordActiveCountAvg;
+    public final MetricNameTemplate connectorCount;
+    public final MetricNameTemplate taskCount;
+    public final MetricNameTemplate connectorStartupAttemptsTotal;
+    public final MetricNameTemplate connectorStartupSuccessTotal;
+    public final MetricNameTemplate connectorStartupSuccessPercentage;
+    public final MetricNameTemplate connectorStartupFailureTotal;
+    public final MetricNameTemplate connectorStartupFailurePercentage;
+    public final MetricNameTemplate taskStartupAttemptsTotal;
+    public final MetricNameTemplate taskStartupSuccessTotal;
+    public final MetricNameTemplate taskStartupSuccessPercentage;
+    public final MetricNameTemplate taskStartupFailureTotal;
+    public final MetricNameTemplate taskStartupFailurePercentage;
+    public final MetricNameTemplate leaderName;
+    public final MetricNameTemplate epoch;
+    public final MetricNameTemplate rebalanceCompletedTotal;
+    public final MetricNameTemplate rebalanceMode;
+    public final MetricNameTemplate rebalanceTimeMax;
+    public final MetricNameTemplate rebalanceTimeAvg;
+    public final MetricNameTemplate rebalanceTimeSinceLast;
 
     public ConnectMetricsRegistry() {
         this(new LinkedHashSet<String>());
@@ -85,28 +103,25 @@ public class ConnectMetricsRegistry {
         Set<String> connectorTags = new LinkedHashSet<>(tags);
         connectorTags.add(CONNECTOR_TAG_NAME);
 
-        connectorStatusRunning = createTemplate("status-running", CONNECTOR_GROUP_NAME,
-                                                "Signals whether the connector is in the running state.", connectorTags);
-        connectorStatusPaused = createTemplate("status-paused", CONNECTOR_GROUP_NAME,
-                                               "Signals whether the connector is in the paused state.", connectorTags);
-        connectorStatusFailed = createTemplate("status-failed", CONNECTOR_GROUP_NAME,
-                                               "Signals whether the connector is in the failed state.", connectorTags);
+        connectorStatus = createTemplate("status", CONNECTOR_GROUP_NAME,
+                                         "The status of the connector. One of 'unassigned', 'running', 'paused', 'failed', or " +
+                                         "'destroyed'.",
+                                         connectorTags);
+        connectorType = createTemplate("connector-type", CONNECTOR_GROUP_NAME, "The type of the connector. One of 'source' or 'sink'.",
+                                       connectorTags);
+        connectorClass = createTemplate("connector-class", CONNECTOR_GROUP_NAME, "The name of the connector class.", connectorTags);
+        connectorVersion = createTemplate("connector-version", CONNECTOR_GROUP_NAME,
+                                          "The version of the connector class, as reported by the connector.", connectorTags);
 
         /***** Worker task level *****/
         Set<String> workerTaskTags = new LinkedHashSet<>(tags);
         workerTaskTags.add(CONNECTOR_TAG_NAME);
         workerTaskTags.add(TASK_TAG_NAME);
 
-        taskStatusUnassigned = createTemplate("status-unassigned", TASK_GROUP_NAME, "Signals whether this task is in the unassigned state.",
-                                              workerTaskTags);
-        taskStatusRunning = createTemplate("status-running", TASK_GROUP_NAME, "Signals whether this task is in the running state.",
-                                           workerTaskTags);
-        taskStatusPaused = createTemplate("status-paused", TASK_GROUP_NAME, "Signals whether this task is in the paused state.",
-                                          workerTaskTags);
-        taskStatusFailed = createTemplate("status-failed", TASK_GROUP_NAME, "Signals whether this task is in the failed state.",
-                                          workerTaskTags);
-        taskStatusDestroyed = createTemplate("status-destroyed", TASK_GROUP_NAME, "Signals whether this task is in the destroyed state.",
-                                             workerTaskTags);
+        taskStatus = createTemplate("status", TASK_GROUP_NAME,
+                                    "The status of the connector task. One of 'unassigned', 'running', 'paused', 'failed', or " +
+                                    "'destroyed'.",
+                                    workerTaskTags);
         taskRunningRatio = createTemplate("running-ratio", TASK_GROUP_NAME,
                                           "The fraction of time this task has spent in the running state.", workerTaskTags);
         taskPauseRatio = createTemplate("pause-ratio", TASK_GROUP_NAME, "The fraction of time this task has spent in the pause state.",
@@ -230,13 +245,55 @@ public class ConnectMetricsRegistry {
                                                "committed/flushed/acknowledged by the sink task.",
                                                sinkTaskTags);
         sinkRecordActiveCountMax = createTemplate("sink-record-active-count-max", SINK_TASK_GROUP_NAME,
-                                                  "The maximum number of records that have been read from Kafka but not yet completely " +
-                                                  "committed/flushed/acknowledged by the sink task.",
+                                                  "The maximum number of records that have been read from Kafka but not yet completely "
+                                                  + "committed/flushed/acknowledged by the sink task.",
                                                   sinkTaskTags);
         sinkRecordActiveCountAvg = createTemplate("sink-record-active-count-avg", SINK_TASK_GROUP_NAME,
-                                                  "The average number of records that have been read from Kafka but not yet completely " +
-                                                  "committed/flushed/acknowledged by the sink task.",
+                                                  "The average number of records that have been read from Kafka but not yet completely "
+                                                  + "committed/flushed/acknowledged by the sink task.",
                                                   sinkTaskTags);
+
+        /***** Worker level *****/
+        Set<String> workerTags = new LinkedHashSet<>(tags);
+
+        connectorCount = createTemplate("connector-count", WORKER_GROUP_NAME, "The number of connectors run in this worker.", workerTags);
+        taskCount = createTemplate("task-count", WORKER_GROUP_NAME, "The number of tasks run in this worker.", workerTags);
+        connectorStartupAttemptsTotal = createTemplate("connector-startup-attempts-total", WORKER_GROUP_NAME,
+                                                  "The total number of connector startups that this worker has attempted.", workerTags);
+        connectorStartupSuccessTotal = createTemplate("connector-startup-success-total", WORKER_GROUP_NAME,
+                                                 "The total number of connector starts that succeeded.", workerTags);
+        connectorStartupSuccessPercentage = createTemplate("connector-startup-success-percentage", WORKER_GROUP_NAME,
+                                                      "The average percentage of this worker's connectors starts that succeeded.", workerTags);
+        connectorStartupFailureTotal = createTemplate("connector-startup-failure-total", WORKER_GROUP_NAME,
+                                                 "The total number of connector starts that failed.", workerTags);
+        connectorStartupFailurePercentage = createTemplate("connector-startup-failure-percentage", WORKER_GROUP_NAME,
+                                                      "The average percentage of this worker's connectors starts that failed.", workerTags);
+        taskStartupAttemptsTotal = createTemplate("task-startup-attempts-total", WORKER_GROUP_NAME,
+                                                  "The total number of task startups that this worker has attempted.", workerTags);
+        taskStartupSuccessTotal = createTemplate("task-startup-success-total", WORKER_GROUP_NAME,
+                                                 "The total number of task starts that succeeded.", workerTags);
+        taskStartupSuccessPercentage = createTemplate("task-startup-success-percentage", WORKER_GROUP_NAME,
+                                                      "The average percentage of this worker's tasks starts that succeeded.", workerTags);
+        taskStartupFailureTotal = createTemplate("task-startup-failure-total", WORKER_GROUP_NAME,
+                                                 "The total number of task starts that failed.", workerTags);
+        taskStartupFailurePercentage = createTemplate("task-startup-failure-percentage", WORKER_GROUP_NAME,
+                                                      "The average percentage of this worker's tasks starts that failed.", workerTags);
+
+        /***** Worker rebalance level *****/
+        Set<String> rebalanceTags = new LinkedHashSet<>(tags);
+
+        leaderName = createTemplate("leader-name", WORKER_REBALANCE_GROUP_NAME, "The name of the group leader.", rebalanceTags);
+        epoch = createTemplate("epoch", WORKER_REBALANCE_GROUP_NAME, "The epoch or generation number of this worker.", rebalanceTags);
+        rebalanceCompletedTotal = createTemplate("completed-rebalances-total", WORKER_REBALANCE_GROUP_NAME,
+                                                "The total number of rebalances completed by this worker.", rebalanceTags);
+        rebalanceMode = createTemplate("rebalancing", WORKER_REBALANCE_GROUP_NAME,
+                                               "Whether this worker is currently rebalancing.", rebalanceTags);
+        rebalanceTimeMax = createTemplate("rebalance-max-time-ms", WORKER_REBALANCE_GROUP_NAME,
+                                          "The maximum time in milliseconds spent by this worker to rebalance.", rebalanceTags);
+        rebalanceTimeAvg = createTemplate("rebalance-avg-time-ms", WORKER_REBALANCE_GROUP_NAME,
+                                          "The average time in milliseconds spent by this worker to rebalance.", rebalanceTags);
+        rebalanceTimeSinceLast = createTemplate("time-since-last-rebalance-ms", WORKER_REBALANCE_GROUP_NAME,
+                                                "The time in milliseconds since this worker completed the most recent rebalance.", rebalanceTags);
     }
 
     private MetricNameTemplate createTemplate(String name, String group, String doc, Set<String> tags) {
@@ -272,4 +329,12 @@ public class ConnectMetricsRegistry {
     public String sourceTaskGroupName() {
         return SOURCE_TASK_GROUP_NAME;
     }
+
+    public String workerGroupName() {
+        return WORKER_GROUP_NAME;
+    }
+
+    public String workerRebalanceGroupName() {
+        return WORKER_REBALANCE_GROUP_NAME;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a47bfbca/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 56bc341..c6e2e17 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -18,12 +18,18 @@ package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Total;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.ConnectMetrics.LiteralSupplier;
+import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
@@ -68,6 +74,7 @@ public class Worker {
     private final String workerId;
     private final Plugins plugins;
     private final ConnectMetrics metrics;
+    private final WorkerMetricsGroup workerMetricsGroup;
     private final WorkerConfig config;
     private final Converter internalKeyConverter;
     private final Converter internalValueConverter;
@@ -91,6 +98,8 @@ public class Worker {
         this.time = time;
         this.plugins = plugins;
         this.config = config;
+        this.workerMetricsGroup = new WorkerMetricsGroup(metrics);
+
         // Internal converters are required properties, thus getClass won't return null.
         this.internalKeyConverter = plugins.newConverter(
                 config.getClass(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG).getName(),
@@ -164,6 +173,8 @@ public class Worker {
         metrics.stop();
 
         log.info("Worker stopped");
+
+        workerMetricsGroup.close();
     }
 
     /**
@@ -204,6 +215,7 @@ public class Worker {
             // Can't be put in a finally block because it needs to be swapped before the call on
             // statusListener
             Plugins.compareAndSwapLoaders(savedLoader);
+            workerMetricsGroup.recordConnectorStartupFailure();
             statusListener.onFailure(connName, t);
             return false;
         }
@@ -213,6 +225,7 @@ public class Worker {
             throw new ConnectException("Connector with name " + connName + " already exists");
 
         log.info("Finished creating connector {}", connName);
+        workerMetricsGroup.recordConnectorStartupSuccess();
         return true;
     }
 
@@ -396,6 +409,7 @@ public class Worker {
             // Can't be put in a finally block because it needs to be swapped before the call on
             // statusListener
             Plugins.compareAndSwapLoaders(savedLoader);
+            workerMetricsGroup.recordTaskFailure();
             statusListener.onFailure(id, t);
             return false;
         }
@@ -408,6 +422,7 @@ public class Worker {
         if (workerTask instanceof WorkerSourceTask) {
             sourceTaskOffsetCommitter.schedule(id, (WorkerSourceTask) workerTask);
         }
+        workerMetricsGroup.recordTaskSuccess();
         return true;
     }
 
@@ -583,4 +598,100 @@ public class Worker {
             Plugins.compareAndSwapLoaders(savedLoader);
         }
     }
+
+    WorkerMetricsGroup workerMetricsGroup() {
+        return workerMetricsGroup;
+    }
+
+    class WorkerMetricsGroup {
+        private final MetricGroup metricGroup;
+        private final Sensor connectorStartupAttempts;
+        private final Sensor connectorStartupSuccesses;
+        private final Sensor connectorStartupFailures;
+        private final Sensor connectorStartupResults;
+        private final Sensor taskStartupAttempts;
+        private final Sensor taskStartupSuccesses;
+        private final Sensor taskStartupFailures;
+        private final Sensor taskStartupResults;
+
+        public WorkerMetricsGroup(ConnectMetrics connectMetrics) {
+            ConnectMetricsRegistry registry = connectMetrics.registry();
+            metricGroup = connectMetrics.group(registry.workerGroupName());
+
+            metricGroup.addValueMetric(registry.connectorCount, new LiteralSupplier<Double>() {
+                @Override
+                public Double metricValue(long now) {
+                    return (double) connectors.size();
+                }
+            });
+            metricGroup.addValueMetric(registry.taskCount, new LiteralSupplier<Double>() {
+                @Override
+                public Double metricValue(long now) {
+                    return (double) tasks.size();
+                }
+            });
+
+            MetricName connectorFailurePct = metricGroup.metricName(registry.connectorStartupFailurePercentage);
+            MetricName connectorSuccessPct = metricGroup.metricName(registry.connectorStartupSuccessPercentage);
+            Frequencies connectorStartupResultFrequencies = Frequencies.forBooleanValues(connectorFailurePct, connectorSuccessPct);
+            connectorStartupResults = metricGroup.sensor("connector-startup-results");
+            connectorStartupResults.add(connectorStartupResultFrequencies);
+
+            connectorStartupAttempts = metricGroup.sensor("connector-startup-attempts");
+            connectorStartupAttempts.add(metricGroup.metricName(registry.connectorStartupAttemptsTotal), new Total());
+
+            connectorStartupSuccesses = metricGroup.sensor("connector-startup-successes");
+            connectorStartupSuccesses.add(metricGroup.metricName(registry.connectorStartupSuccessTotal), new Total());
+
+            connectorStartupFailures = metricGroup.sensor("connector-startup-failures");
+            connectorStartupFailures.add(metricGroup.metricName(registry.connectorStartupFailureTotal), new Total());
+
+            MetricName taskFailurePct = metricGroup.metricName(registry.taskStartupFailurePercentage);
+            MetricName taskSuccessPct = metricGroup.metricName(registry.taskStartupSuccessPercentage);
+            Frequencies taskStartupResultFrequencies = Frequencies.forBooleanValues(taskFailurePct, taskSuccessPct);
+            taskStartupResults = metricGroup.sensor("task-startup-results");
+            taskStartupResults.add(taskStartupResultFrequencies);
+
+            taskStartupAttempts = metricGroup.sensor("task-startup-attempts");
+            taskStartupAttempts.add(metricGroup.metricName(registry.taskStartupAttemptsTotal), new Total());
+
+            taskStartupSuccesses = metricGroup.sensor("task-startup-successes");
+            taskStartupSuccesses.add(metricGroup.metricName(registry.taskStartupSuccessTotal), new Total());
+
+            taskStartupFailures = metricGroup.sensor("task-startup-failures");
+            taskStartupFailures.add(metricGroup.metricName(registry.taskStartupFailureTotal), new Total());
+        }
+
+        void close() {
+            metricGroup.close();
+        }
+
+        void recordConnectorStartupFailure() {
+            connectorStartupAttempts.record(1.0);
+            connectorStartupFailures.record(1.0);
+            connectorStartupResults.record(0.0);
+        }
+
+        void recordConnectorStartupSuccess() {
+            connectorStartupAttempts.record(1.0);
+            connectorStartupSuccesses.record(1.0);
+            connectorStartupResults.record(1.0);
+        }
+
+        void recordTaskFailure() {
+            taskStartupAttempts.record(1.0);
+            taskStartupFailures.record(1.0);
+            taskStartupResults.record(0.0);
+        }
+
+        void recordTaskSuccess() {
+            taskStartupAttempts.record(1.0);
+            taskStartupSuccesses.record(1.0);
+            taskStartupResults.record(1.0);
+        }
+
+        protected MetricGroup metricGroup() {
+            return metricGroup;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a47bfbca/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
index 21104bd..9e65cd2 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
@@ -16,16 +16,18 @@
  */
 package org.apache.kafka.connect.runtime;
 
-import org.apache.kafka.common.MetricNameTemplate;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
-import org.apache.kafka.connect.runtime.ConnectMetrics.IndicatorPredicate;
+import org.apache.kafka.connect.runtime.ConnectMetrics.LiteralSupplier;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.source.SourceConnector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Locale;
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * Container for connectors which is responsible for managing their lifecycle (e.g. handling startup,
@@ -199,6 +201,18 @@ public class WorkerConnector {
         return SinkConnector.class.isAssignableFrom(connector.getClass());
     }
 
+    public boolean isSourceConnector() {
+        return SourceConnector.class.isAssignableFrom(connector.getClass());
+    }
+
+    protected String connectorType() {
+        if (isSinkConnector())
+            return "sink";
+        if (isSourceConnector())
+            return "source";
+        return "unknown";
+    }
+
     public Connector connector() {
         return connector;
     }
@@ -224,22 +238,23 @@ public class WorkerConnector {
         private final ConnectorStatus.Listener delegate;
 
         public ConnectorMetricsGroup(ConnectMetrics connectMetrics, AbstractStatus.State initialState, ConnectorStatus.Listener delegate) {
+            Objects.requireNonNull(connectMetrics);
+            Objects.requireNonNull(connector);
+            Objects.requireNonNull(initialState);
+            Objects.requireNonNull(delegate);
             this.delegate = delegate;
             this.state = initialState;
             ConnectMetricsRegistry registry = connectMetrics.registry();
             this.metricGroup = connectMetrics.group(registry.connectorGroupName(),
                     registry.connectorTagName(), connName);
 
-            addStateMetric(AbstractStatus.State.RUNNING, registry.connectorStatusRunning);
-            addStateMetric(AbstractStatus.State.PAUSED, registry.connectorStatusPaused);
-            addStateMetric(AbstractStatus.State.FAILED, registry.connectorStatusFailed);
-        }
-
-        private void addStateMetric(final AbstractStatus.State matchingState, MetricNameTemplate nameTemplate) {
-            metricGroup.addIndicatorMetric(nameTemplate, new IndicatorPredicate() {
+            metricGroup.addImmutableValueMetric(registry.connectorType, connectorType());
+            metricGroup.addImmutableValueMetric(registry.connectorClass, connector.getClass().getName());
+            metricGroup.addImmutableValueMetric(registry.connectorVersion, connector.version());
+            metricGroup.addValueMetric(registry.connectorStatus, new LiteralSupplier<String>() {
                 @Override
-                public boolean matches() {
-                    return state == matchingState;
+                public String metricValue(long now) {
+                    return state.toString().toLowerCase(Locale.getDefault());
                 }
             });
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a47bfbca/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index 6499ac2..ec06924 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -26,13 +26,14 @@ import org.apache.kafka.common.metrics.stats.Frequencies;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.runtime.AbstractStatus.State;
-import org.apache.kafka.connect.runtime.ConnectMetrics.IndicatorPredicate;
+import org.apache.kafka.connect.runtime.ConnectMetrics.LiteralSupplier;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Locale;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -313,11 +314,12 @@ abstract class WorkerTask implements Runnable {
                     registry.connectorTagName(), id.connector(),
                     registry.taskTagName(), Integer.toString(id.task()));
 
-            addTaskStateMetric(State.UNASSIGNED, registry.taskStatusUnassigned);
-            addTaskStateMetric(State.RUNNING, registry.taskStatusRunning);
-            addTaskStateMetric(State.PAUSED, registry.taskStatusPaused);
-            addTaskStateMetric(State.FAILED, registry.taskStatusDestroyed);
-            addTaskStateMetric(State.DESTROYED, registry.taskStatusDestroyed);
+            metricGroup.addValueMetric(registry.taskStatus, new LiteralSupplier<String>() {
+                @Override
+                public String metricValue(long now) {
+                    return taskStateTimer.currentState().toString().toLowerCase(Locale.getDefault());
+                }
+            });
 
             addRatioMetric(State.RUNNING, registry.taskRunningRatio);
             addRatioMetric(State.PAUSED, registry.taskPauseRatio);
@@ -337,15 +339,6 @@ abstract class WorkerTask implements Runnable {
             commitAttempts.add(commitFrequencies);
         }
 
-        private void addTaskStateMetric(final State matchingState, MetricNameTemplate template) {
-            metricGroup.addIndicatorMetric(template, new IndicatorPredicate() {
-                @Override
-                public boolean matches() {
-                    return matchingState == taskStateTimer.currentState();
-                }
-            });
-        }
-
         private void addRatioMetric(final State matchingState, MetricNameTemplate template) {
             MetricName metricName = metricGroup.metricName(template);
             if (metricGroup.metrics().metric(metricName) == null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a47bfbca/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 390f8c3..4d3d07b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -19,6 +19,10 @@ package org.apache.kafka.connect.runtime.distributed;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigValue;
 import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Total;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -28,6 +32,10 @@ import org.apache.kafka.connect.errors.AlreadyExistsException;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.NotFoundException;
 import org.apache.kafka.connect.runtime.AbstractHerder;
+import org.apache.kafka.connect.runtime.ConnectMetrics;
+import org.apache.kafka.connect.runtime.ConnectMetrics.LiteralSupplier;
+import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.ConnectMetricsRegistry;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.HerderConnectorContext;
 import org.apache.kafka.connect.runtime.SinkConnectorConfig;
@@ -106,6 +114,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
     private final AtomicLong requestSeqNum = new AtomicLong();
 
     private final Time time;
+    private final HerderMetrics herderMetrics;
 
     private final String workerGroupId;
     private final int workerSyncTimeoutMs;
@@ -143,7 +152,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
                              StatusBackingStore statusBackingStore,
                              ConfigBackingStore configBackingStore,
                              String restUrl) {
-        this(config, worker, worker.workerId(), statusBackingStore, configBackingStore, null, restUrl, time);
+        this(config, worker, worker.workerId(), statusBackingStore, configBackingStore, null, restUrl, worker.metrics(), time);
         configBackingStore.setUpdateListener(new ConfigUpdateListener());
     }
 
@@ -155,10 +164,12 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
                       ConfigBackingStore configBackingStore,
                       WorkerGroupMember member,
                       String restUrl,
+                      ConnectMetrics metrics,
                       Time time) {
         super(worker, workerId, statusBackingStore, configBackingStore);
 
         this.time = time;
+        this.herderMetrics = new HerderMetrics(metrics);
         this.workerGroupId = config.getString(DistributedConfig.GROUP_ID_CONFIG);
         this.workerSyncTimeoutMs = config.getInt(DistributedConfig.WORKER_SYNC_TIMEOUT_MS_CONFIG);
         this.workerTasksShutdownTimeoutMs = config.getLong(DistributedConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG);
@@ -202,6 +213,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
             halt();
 
             log.info("Herder stopped");
+            herderMetrics.close();
         } catch (Throwable t) {
             log.error("Uncaught exception in herder work thread, exiting: ", t);
             Exit.exit(1);
@@ -781,6 +793,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         // We only mark this as resolved once we've actually started work, which allows us to correctly track whether
         // what work is currently active and running. If we bail early, the main tick loop + having requested rejoin
         // guarantees we'll attempt to rejoin before executing this method again.
+        herderMetrics.rebalanceSucceeded(time.milliseconds());
         rebalanceResolved = true;
         return true;
     }
@@ -1163,6 +1176,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         }
     }
 
+    protected HerderMetrics herderMetrics() {
+        return herderMetrics;
+    }
+
     // Rebalances are triggered internally from the group member, so these are always executed in the work thread.
     public class RebalanceListener implements WorkerRebalanceListener {
         @Override
@@ -1177,6 +1194,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
                 DistributedHerder.this.assignment = assignment;
                 DistributedHerder.this.generation = generation;
                 rebalanceResolved = false;
+                herderMetrics.rebalanceStarted(time.milliseconds());
             }
 
             // Delete the statuses of all connectors removed prior to the start of this rebalance. This has to
@@ -1230,4 +1248,71 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         }
     }
 
+    class HerderMetrics {
+        private final MetricGroup metricGroup;
+        private final Sensor rebalanceCompletedCounts;
+        private final Sensor rebalanceTime;
+        private volatile long lastRebalanceCompletedAtMillis = Long.MIN_VALUE;
+        private volatile boolean rebalancing = false;
+        private volatile long rebalanceStartedAtMillis = 0L;
+
+        public HerderMetrics(ConnectMetrics connectMetrics) {
+            ConnectMetricsRegistry registry = connectMetrics.registry();
+            metricGroup = connectMetrics.group(registry.workerRebalanceGroupName());
+
+            metricGroup.addValueMetric(registry.leaderName, new LiteralSupplier<String>() {
+                @Override
+                public String metricValue(long now) {
+                    return leaderUrl();
+                }
+            });
+            metricGroup.addValueMetric(registry.epoch, new LiteralSupplier<Double>() {
+                @Override
+                public Double metricValue(long now) {
+                    return (double) generation;
+                }
+            });
+            metricGroup.addValueMetric(registry.rebalanceMode, new LiteralSupplier<Double>() {
+                @Override
+                public Double metricValue(long now) {
+                    return rebalancing ? 1.0d : 0.0d;
+                }
+            });
+
+            rebalanceCompletedCounts = metricGroup.sensor("completed-rebalance-count");
+            rebalanceCompletedCounts.add(metricGroup.metricName(registry.rebalanceCompletedTotal), new Total());
+
+            rebalanceTime = metricGroup.sensor("rebalance-time");
+            rebalanceTime.add(metricGroup.metricName(registry.rebalanceTimeMax), new Max());
+            rebalanceTime.add(metricGroup.metricName(registry.rebalanceTimeAvg), new Avg());
+
+            metricGroup.addValueMetric(registry.rebalanceTimeSinceLast, new LiteralSupplier<Double>() {
+                @Override
+                public Double metricValue(long now) {
+                    return lastRebalanceCompletedAtMillis == Long.MIN_VALUE ? Double.POSITIVE_INFINITY : (double) (now - lastRebalanceCompletedAtMillis);
+                }
+            });
+        }
+
+        void close() {
+            metricGroup.close();
+        }
+
+        void rebalanceStarted(long now) {
+            rebalanceStartedAtMillis = now;
+            rebalancing = true;
+        }
+
+        void rebalanceSucceeded(long now) {
+            long duration = Math.max(0L, now - rebalanceStartedAtMillis);
+            rebalancing = false;
+            rebalanceCompletedCounts.record(1.0);
+            rebalanceTime.record(duration);
+            lastRebalanceCompletedAtMillis = now;
+        }
+
+        protected MetricGroup metricGroup() {
+            return metricGroup;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a47bfbca/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
index 6cc6db7..f1df140 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
@@ -56,6 +56,10 @@ public class MockConnectMetrics extends ConnectMetrics {
         this(new MockTime());
     }
 
+    public MockConnectMetrics(org.apache.kafka.common.utils.MockTime time) {
+        super("mock", new WorkerConfig(WorkerConfig.baseConfigDef(), DEFAULT_WORKER_CONFIG), time);
+    }
+
     public MockConnectMetrics(MockTime time) {
         super("mock", new WorkerConfig(WorkerConfig.baseConfigDef(), DEFAULT_WORKER_CONFIG), time);
     }
@@ -73,28 +77,81 @@ public class MockConnectMetrics extends ConnectMetrics {
      * @param name        the name of the metric
      * @return the current value of the metric
      */
-    public double currentMetricValue(MetricGroup metricGroup, String name) {
+    public Object currentMetricValue(MetricGroup metricGroup, String name) {
+        return currentMetricValue(this, metricGroup, name);
+    }
+
+    /**
+     * Get the current value of the named metric, which may have already been removed from the
+     * {@link org.apache.kafka.common.metrics.Metrics} but will have been captured before it was removed.
+     *
+     * @param metricGroup the metric metricGroup that contained the metric
+     * @param name        the name of the metric
+     * @return the current value of the metric
+     */
+    public double currentMetricValueAsDouble(MetricGroup metricGroup, String name) {
+        Object value = currentMetricValue(metricGroup, name);
+        return value instanceof Double ? ((Double) value).doubleValue() : Double.NaN;
+    }
+
+    /**
+     * Get the current value of the named metric, which may have already been removed from the
+     * {@link org.apache.kafka.common.metrics.Metrics} but will have been captured before it was removed.
+     *
+     * @param metricGroup the metric metricGroup that contained the metric
+     * @param name        the name of the metric
+     * @return the current value of the metric
+     */
+    public String currentMetricValueAsString(MetricGroup metricGroup, String name) {
+        Object value = currentMetricValue(metricGroup, name);
+        return value instanceof String ? (String) value : null;
+    }
+
+    /**
+     * Get the current value of the named metric, which may have already been removed from the
+     * {@link org.apache.kafka.common.metrics.Metrics} but will have been captured before it was removed.
+     *
+     * @param metrics the {@link ConnectMetrics} instance
+     * @param metricGroup the metric metricGroup that contained the metric
+     * @param name        the name of the metric
+     * @return the current value of the metric
+     */
+    public static Object currentMetricValue(ConnectMetrics metrics, MetricGroup metricGroup, String name) {
         MetricName metricName = metricGroup.metricName(name);
-        for (MetricsReporter reporter : metrics().reporters()) {
+        for (MetricsReporter reporter : metrics.metrics().reporters()) {
             if (reporter instanceof MockMetricsReporter) {
                 return ((MockMetricsReporter) reporter).currentMetricValue(metricName);
             }
         }
-        return Double.NEGATIVE_INFINITY;
+        return null;
     }
 
     /**
-     * Determine if the {@link KafkaMetric} with the specified name exists within the
-     * {@link org.apache.kafka.common.metrics.Metrics} instance.
+     * Get the current value of the named metric, which may have already been removed from the
+     * {@link org.apache.kafka.common.metrics.Metrics} but will have been captured before it was removed.
      *
+     * @param metrics the {@link ConnectMetrics} instance
      * @param metricGroup the metric metricGroup that contained the metric
      * @param name        the name of the metric
-     * @return true if the metric is still register, or false if it has been removed
+     * @return the current value of the metric
      */
-    public boolean metricExists(MetricGroup metricGroup, String name) {
-        MetricName metricName = metricGroup.metricName(name);
-        KafkaMetric metric = metricGroup.metrics().metric(metricName);
-        return metric != null;
+    public static double currentMetricValueAsDouble(ConnectMetrics metrics, MetricGroup metricGroup, String name) {
+        Object value = currentMetricValue(metrics, metricGroup, name);
+        return value instanceof Double ? ((Double) value).doubleValue() : Double.NaN;
+    }
+
+    /**
+     * Get the current value of the named metric, which may have already been removed from the
+     * {@link org.apache.kafka.common.metrics.Metrics} but will have been captured before it was removed.
+     *
+     * @param metrics the {@link ConnectMetrics} instance
+     * @param metricGroup the metric metricGroup that contained the metric
+     * @param name        the name of the metric
+     * @return the current value of the metric
+     */
+    public static String currentMetricValueAsString(ConnectMetrics metrics, MetricGroup metricGroup, String name) {
+        Object value = currentMetricValue(metrics, metricGroup, name);
+        return value instanceof String ? (String) value : null;
     }
 
     public static class MockMetricsReporter implements MetricsReporter {
@@ -136,10 +193,9 @@ public class MockConnectMetrics extends ConnectMetrics {
          * @param metricName the name of the metric that was registered most recently
          * @return the current value of the metric
          */
-        @SuppressWarnings("deprecation")
-        public double currentMetricValue(MetricName metricName) {
+        public Object currentMetricValue(MetricName metricName) {
             KafkaMetric metric = metricsByName.get(metricName);
-            return metric != null ? metric.value() : Double.NaN;
+            return metric != null ? metric.metricValue() : null;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a47bfbca/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
index 5f03f5a..10c413d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
+import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockRunner;
@@ -32,12 +33,15 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 @RunWith(EasyMockRunner.class)
 public class WorkerConnectorTest extends EasyMockSupport {
 
+    private static final String VERSION = "1.1";
     public static final String CONNECTOR = "connector";
     public static final Map<String, String> CONFIG = new HashMap<>();
     static {
@@ -45,7 +49,7 @@ public class WorkerConnectorTest extends EasyMockSupport {
         CONFIG.put(ConnectorConfig.NAME_CONFIG, CONNECTOR);
     }
     public ConnectorConfig connectorConfig;
-    public ConnectMetrics metrics;
+    public MockConnectMetrics metrics;
 
     @Mock Plugins plugins;
     @Mock Connector connector;
@@ -67,6 +71,9 @@ public class WorkerConnectorTest extends EasyMockSupport {
     public void testInitializeFailure() {
         RuntimeException exception = new RuntimeException();
 
+        connector.version();
+        expectLastCall().andReturn(VERSION);
+
         connector.initialize(EasyMock.notNull(ConnectorContext.class));
         expectLastCall().andThrow(exception);
 
@@ -92,6 +99,9 @@ public class WorkerConnectorTest extends EasyMockSupport {
     public void testFailureIsFinalState() {
         RuntimeException exception = new RuntimeException();
 
+        connector.version();
+        expectLastCall().andReturn(VERSION);
+
         connector.initialize(EasyMock.notNull(ConnectorContext.class));
         expectLastCall().andThrow(exception);
 
@@ -119,6 +129,9 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
     @Test
     public void testStartupAndShutdown() {
+        connector.version();
+        expectLastCall().andReturn(VERSION);
+
         connector.initialize(EasyMock.notNull(ConnectorContext.class));
         expectLastCall();
 
@@ -150,6 +163,9 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
     @Test
     public void testStartupAndPause() {
+        connector.version();
+        expectLastCall().andReturn(VERSION);
+
         connector.initialize(EasyMock.notNull(ConnectorContext.class));
         expectLastCall();
 
@@ -186,6 +202,9 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
     @Test
     public void testOnResume() {
+        connector.version();
+        expectLastCall().andReturn(VERSION);
+
         connector.initialize(EasyMock.notNull(ConnectorContext.class));
         expectLastCall();
 
@@ -222,6 +241,9 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
     @Test
     public void testStartupPaused() {
+        connector.version();
+        expectLastCall().andReturn(VERSION);
+
         connector.initialize(EasyMock.notNull(ConnectorContext.class));
         expectLastCall();
 
@@ -251,6 +273,9 @@ public class WorkerConnectorTest extends EasyMockSupport {
     public void testStartupFailure() {
         RuntimeException exception = new RuntimeException();
 
+        connector.version();
+        expectLastCall().andReturn(VERSION);
+
         connector.initialize(EasyMock.notNull(ConnectorContext.class));
         expectLastCall();
 
@@ -281,6 +306,9 @@ public class WorkerConnectorTest extends EasyMockSupport {
     public void testShutdownFailure() {
         RuntimeException exception = new RuntimeException();
 
+        connector.version();
+        expectLastCall().andReturn(VERSION);
+
         connector.initialize(EasyMock.notNull(ConnectorContext.class));
         expectLastCall();
 
@@ -312,6 +340,9 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
     @Test
     public void testTransitionStartedToStarted() {
+        connector.version();
+        expectLastCall().andReturn(VERSION);
+
         connector.initialize(EasyMock.notNull(ConnectorContext.class));
         expectLastCall();
 
@@ -346,6 +377,9 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
     @Test
     public void testTransitionPausedToPaused() {
+        connector.version();
+        expectLastCall().andReturn(VERSION);
+
         connector.initialize(EasyMock.notNull(ConnectorContext.class));
         expectLastCall();
 
@@ -415,6 +449,14 @@ public class WorkerConnectorTest extends EasyMockSupport {
         assertFalse(workerConnector.metrics().isFailed());
         assertFalse(workerConnector.metrics().isPaused());
         assertFalse(workerConnector.metrics().isRunning());
+        MetricGroup metricGroup = workerConnector.metrics().metricGroup();
+        String status = metrics.currentMetricValueAsString(metricGroup, "status");
+        String type = metrics.currentMetricValueAsString(metricGroup, "connector-type");
+        String clazz = metrics.currentMetricValueAsString(metricGroup, "connector-class");
+        String version = metrics.currentMetricValueAsString(metricGroup, "connector-version");
+        assertEquals(type, "unknown");
+        assertNotNull(clazz);
+        assertEquals(VERSION, version);
     }
 
     private static abstract class TestConnector extends Connector {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a47bfbca/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 782d66b..50b091d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -180,8 +180,7 @@ public class WorkerSinkTaskTest {
         time.sleep(10000L);
 
         assertSinkMetricValue("partition-count", 2);
-        assertTaskMetricValue("status-running", 0.0);
-        assertTaskMetricValue("status-paused", 1.0);
+        assertTaskMetricValue("status", "paused");
         assertTaskMetricValue("running-ratio", 0.0);
         assertTaskMetricValue("pause-ratio", 1.0);
         assertTaskMetricValue("offset-commit-max-time-ms", Double.NEGATIVE_INFINITY);
@@ -253,8 +252,7 @@ public class WorkerSinkTaskTest {
         assertSinkMetricValue("offset-commit-completion-total", 0.0);
         assertSinkMetricValue("offset-commit-skip-rate", 0.0);
         assertSinkMetricValue("offset-commit-skip-total", 0.0);
-        assertTaskMetricValue("status-running", 1.0);
-        assertTaskMetricValue("status-paused", 0.0);
+        assertTaskMetricValue("status", "running");
         assertTaskMetricValue("running-ratio", 1.0);
         assertTaskMetricValue("pause-ratio", 0.0);
         assertTaskMetricValue("batch-size-max", 1.0);
@@ -272,8 +270,7 @@ public class WorkerSinkTaskTest {
         assertSinkMetricValue("offset-commit-completion-total", 1.0);
         assertSinkMetricValue("offset-commit-skip-rate", 0.0);
         assertSinkMetricValue("offset-commit-skip-total", 0.0);
-        assertTaskMetricValue("status-running", 0.0);
-        assertTaskMetricValue("status-paused", 1.0);
+        assertTaskMetricValue("status", "paused");
         assertTaskMetricValue("running-ratio", 0.25);
         assertTaskMetricValue("pause-ratio", 0.75);
 
@@ -333,8 +330,7 @@ public class WorkerSinkTaskTest {
         assertSinkMetricValue("offset-commit-completion-total", 0.0);
         assertSinkMetricValue("offset-commit-skip-rate", 0.0);
         assertSinkMetricValue("offset-commit-skip-total", 0.0);
-        assertTaskMetricValue("status-running", 1.0);
-        assertTaskMetricValue("status-paused", 0.0);
+        assertTaskMetricValue("status", "running");
         assertTaskMetricValue("running-ratio", 1.0);
         assertTaskMetricValue("pause-ratio", 0.0);
         assertTaskMetricValue("batch-size-max", 0.0);
@@ -352,8 +348,7 @@ public class WorkerSinkTaskTest {
         assertSinkMetricValue("sink-record-active-count", 1.0);
         assertSinkMetricValue("sink-record-active-count-max", 1.0);
         assertSinkMetricValue("sink-record-active-count-avg", 0.5);
-        assertTaskMetricValue("status-running", 1.0);
-        assertTaskMetricValue("status-paused", 0.0);
+        assertTaskMetricValue("status", "running");
         assertTaskMetricValue("running-ratio", 1.0);
         assertTaskMetricValue("batch-size-max", 1.0);
         assertTaskMetricValue("batch-size-avg", 0.5);
@@ -492,8 +487,7 @@ public class WorkerSinkTaskTest {
         assertSinkMetricValue("offset-commit-seq-no", 1.0);
         assertSinkMetricValue("offset-commit-completion-total", 1.0);
         assertSinkMetricValue("offset-commit-skip-total", 0.0);
-        assertTaskMetricValue("status-running", 1.0);
-        assertTaskMetricValue("status-paused", 0.0);
+        assertTaskMetricValue("status", "running");
         assertTaskMetricValue("running-ratio", 1.0);
         assertTaskMetricValue("pause-ratio", 0.0);
         assertTaskMetricValue("batch-size-max", 1.0);
@@ -560,8 +554,7 @@ public class WorkerSinkTaskTest {
         assertSinkMetricValue("offset-commit-seq-no", 0.0);
         assertSinkMetricValue("offset-commit-completion-total", 0.0);
         assertSinkMetricValue("offset-commit-skip-total", 0.0);
-        assertTaskMetricValue("status-running", 1.0);
-        assertTaskMetricValue("status-paused", 0.0);
+        assertTaskMetricValue("status", "running");
         assertTaskMetricValue("running-ratio", 1.0);
         assertTaskMetricValue("pause-ratio", 0.0);
         assertTaskMetricValue("batch-size-max", 1.0);
@@ -588,8 +581,7 @@ public class WorkerSinkTaskTest {
         assertSinkMetricValue("offset-commit-seq-no", 1.0);
         assertSinkMetricValue("offset-commit-completion-total", 1.0);
         assertSinkMetricValue("offset-commit-skip-total", 0.0);
-        assertTaskMetricValue("status-running", 1.0);
-        assertTaskMetricValue("status-paused", 0.0);
+        assertTaskMetricValue("status", "running");
         assertTaskMetricValue("running-ratio", 1.0);
         assertTaskMetricValue("pause-ratio", 0.0);
         assertTaskMetricValue("batch-size-max", 1.0);
@@ -991,8 +983,7 @@ public class WorkerSinkTaskTest {
         assertSinkMetricValue("offset-commit-seq-no", 2.0);
         assertSinkMetricValue("offset-commit-completion-total", 1.0);
         assertSinkMetricValue("offset-commit-skip-total", 1.0);
-        assertTaskMetricValue("status-running", 1.0);
-        assertTaskMetricValue("status-paused", 0.0);
+        assertTaskMetricValue("status", "running");
         assertTaskMetricValue("running-ratio", 1.0);
         assertTaskMetricValue("pause-ratio", 0.0);
         assertTaskMetricValue("batch-size-max", 2.0);
@@ -1026,8 +1017,7 @@ public class WorkerSinkTaskTest {
         assertSinkMetricValue("offset-commit-seq-no", 3.0);
         assertSinkMetricValue("offset-commit-completion-total", 2.0);
         assertSinkMetricValue("offset-commit-skip-total", 1.0);
-        assertTaskMetricValue("status-running", 1.0);
-        assertTaskMetricValue("status-paused", 0.0);
+        assertTaskMetricValue("status", "running");
         assertTaskMetricValue("running-ratio", 1.0);
         assertTaskMetricValue("pause-ratio", 0.0);
         assertTaskMetricValue("batch-size-max", 2.0);
@@ -1089,7 +1079,7 @@ public class WorkerSinkTaskTest {
         assertFalse(sinkTaskContext.getValue().isCommitRequested()); // should have been cleared
         assertEquals(offsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
         assertEquals(0, workerTask.commitFailures());
-        assertEquals(1.0, metrics.currentMetricValue(workerTask.taskMetricsGroup().metricGroup(), "batch-size-max"), 0.0001);
+        assertEquals(1.0, metrics.currentMetricValueAsDouble(workerTask.taskMetricsGroup().metricGroup(), "batch-size-max"), 0.0001);
 
         PowerMock.verifyAll();
     }
@@ -1289,16 +1279,22 @@ public class WorkerSinkTaskTest {
 
     private void assertSinkMetricValue(String name, double expected) {
         MetricGroup sinkTaskGroup = workerTask.sinkTaskMetricsGroup().metricGroup();
-        double measured = metrics.currentMetricValue(sinkTaskGroup, name);
+        double measured = metrics.currentMetricValueAsDouble(sinkTaskGroup, name);
         assertEquals(expected, measured, 0.001d);
     }
 
     private void assertTaskMetricValue(String name, double expected) {
         MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup();
-        double measured = metrics.currentMetricValue(taskGroup, name);
+        double measured = metrics.currentMetricValueAsDouble(taskGroup, name);
         assertEquals(expected, measured, 0.001d);
     }
 
+    private void assertTaskMetricValue(String name, String expected) {
+        MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup();
+        String measured = metrics.currentMetricValueAsString(taskGroup, name);
+        assertEquals(expected, measured);
+    }
+
     private void printMetrics() {
         System.out.println("");
         sinkMetricValue("sink-record-read-rate");
@@ -1334,14 +1330,14 @@ public class WorkerSinkTaskTest {
 
     private double sinkMetricValue(String metricName) {
         MetricGroup sinkTaskGroup = workerTask.sinkTaskMetricsGroup().metricGroup();
-        double value = metrics.currentMetricValue(sinkTaskGroup, metricName);
+        double value = metrics.currentMetricValueAsDouble(sinkTaskGroup, metricName);
         System.out.println("** " + metricName + "=" + value);
         return value;
     }
 
     private double taskMetricValue(String metricName) {
         MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup();
-        double value = metrics.currentMetricValue(taskGroup, metricName);
+        double value = metrics.currentMetricValueAsDouble(taskGroup, metricName);
         System.out.println("** " + metricName + "=" + value);
         return value;
     }
@@ -1350,10 +1346,10 @@ public class WorkerSinkTaskTest {
     private void assertMetrics(int minimumPollCountExpected) {
         MetricGroup sinkTaskGroup = workerTask.sinkTaskMetricsGroup().metricGroup();
         MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup();
-        double readRate = metrics.currentMetricValue(sinkTaskGroup, "sink-record-read-rate");
-        double readTotal = metrics.currentMetricValue(sinkTaskGroup, "sink-record-read-total");
-        double sendRate = metrics.currentMetricValue(sinkTaskGroup, "sink-record-send-rate");
-        double sendTotal = metrics.currentMetricValue(sinkTaskGroup, "sink-record-send-total");
+        double readRate = metrics.currentMetricValueAsDouble(sinkTaskGroup, "sink-record-read-rate");
+        double readTotal = metrics.currentMetricValueAsDouble(sinkTaskGroup, "sink-record-read-total");
+        double sendRate = metrics.currentMetricValueAsDouble(sinkTaskGroup, "sink-record-send-rate");
+        double sendTotal = metrics.currentMetricValueAsDouble(sinkTaskGroup, "sink-record-send-total");
     }
 
     private abstract static class TestSinkTask extends SinkTask  {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a47bfbca/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 928ccb9..4f0d243 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -615,12 +615,12 @@ public class WorkerSourceTaskTest extends ThreadedTest {
             group.recordPoll(100, 1000 + i * 100);
             group.recordWrite(10);
         }
-        assertEquals(1900.0, metrics.currentMetricValue(group.metricGroup(), "poll-batch-max-time-ms"), 0.001d);
-        assertEquals(1450.0, metrics.currentMetricValue(group.metricGroup(), "poll-batch-avg-time-ms"), 0.001d);
-        assertEquals(33.333, metrics.currentMetricValue(group.metricGroup(), "source-record-poll-rate"), 0.001d);
-        assertEquals(1000, metrics.currentMetricValue(group.metricGroup(), "source-record-poll-total"), 0.001d);
-        assertEquals(3.3333, metrics.currentMetricValue(group.metricGroup(), "source-record-write-rate"), 0.001d);
-        assertEquals(100, metrics.currentMetricValue(group.metricGroup(), "source-record-write-total"), 0.001d);
+        assertEquals(1900.0, metrics.currentMetricValueAsDouble(group.metricGroup(), "poll-batch-max-time-ms"), 0.001d);
+        assertEquals(1450.0, metrics.currentMetricValueAsDouble(group.metricGroup(), "poll-batch-avg-time-ms"), 0.001d);
+        assertEquals(33.333, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-poll-rate"), 0.001d);
+        assertEquals(1000, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-poll-total"), 0.001d);
+        assertEquals(3.3333, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-write-rate"), 0.001d);
+        assertEquals(100, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-write-total"), 0.001d);
     }
 
     private CountDownLatch expectPolls(int minimum, final AtomicInteger count) throws InterruptedException {
@@ -795,19 +795,19 @@ public class WorkerSourceTaskTest extends ThreadedTest {
     private void assertPollMetrics(int minimumPollCountExpected) {
         MetricGroup sourceTaskGroup = workerTask.sourceTaskMetricsGroup().metricGroup();
         MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup();
-        double pollRate = metrics.currentMetricValue(sourceTaskGroup, "source-record-poll-rate");
-        double pollTotal = metrics.currentMetricValue(sourceTaskGroup, "source-record-poll-total");
+        double pollRate = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-poll-rate");
+        double pollTotal = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-poll-total");
         if (minimumPollCountExpected > 0) {
-            assertEquals(RECORDS.size(), metrics.currentMetricValue(taskGroup, "batch-size-max"), 0.000001d);
-            assertEquals(RECORDS.size(), metrics.currentMetricValue(taskGroup, "batch-size-avg"), 0.000001d);
+            assertEquals(RECORDS.size(), metrics.currentMetricValueAsDouble(taskGroup, "batch-size-max"), 0.000001d);
+            assertEquals(RECORDS.size(), metrics.currentMetricValueAsDouble(taskGroup, "batch-size-avg"), 0.000001d);
             assertTrue(pollRate > 0.0d);
         } else {
             assertTrue(pollRate == 0.0d);
         }
         assertTrue(pollTotal >= minimumPollCountExpected);
 
-        double writeRate = metrics.currentMetricValue(sourceTaskGroup, "source-record-write-rate");
-        double writeTotal = metrics.currentMetricValue(sourceTaskGroup, "source-record-write-total");
+        double writeRate = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-write-rate");
+        double writeTotal = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-write-total");
         if (minimumPollCountExpected > 0) {
             assertTrue(writeRate > 0.0d);
         } else {
@@ -815,14 +815,14 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         }
         assertTrue(writeTotal >= minimumPollCountExpected);
 
-        double pollBatchTimeMax = metrics.currentMetricValue(sourceTaskGroup, "poll-batch-max-time-ms");
-        double pollBatchTimeAvg = metrics.currentMetricValue(sourceTaskGroup, "poll-batch-avg-time-ms");
+        double pollBatchTimeMax = metrics.currentMetricValueAsDouble(sourceTaskGroup, "poll-batch-max-time-ms");
+        double pollBatchTimeAvg = metrics.currentMetricValueAsDouble(sourceTaskGroup, "poll-batch-avg-time-ms");
         if (minimumPollCountExpected > 0) {
             assertTrue(pollBatchTimeMax >= 0.0d);
         }
         assertTrue(pollBatchTimeAvg >= 0.0d);
-        double activeCount = metrics.currentMetricValue(sourceTaskGroup, "source-record-active-count");
-        double activeCountMax = metrics.currentMetricValue(sourceTaskGroup, "source-record-active-count-max");
+        double activeCount = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-active-count");
+        double activeCountMax = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-active-count-max");
         assertEquals(0, activeCount, 0.000001d);
         if (minimumPollCountExpected > 0) {
             assertEquals(RECORDS.size(), activeCountMax, 0.000001d);

http://git-wip-us.apache.org/repos/asf/kafka/blob/a47bfbca/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
index 96746a5..516b71a 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
@@ -314,8 +314,8 @@ public class WorkerTaskTest {
         long totalTime = 27000L;
         double pauseTimeRatio = (double) (3000L + 5000L) / totalTime;
         double runningTimeRatio = (double) (2000L + 4000L + 6000L) / totalTime;
-        assertEquals(pauseTimeRatio, metrics.currentMetricValue(group.metricGroup(), "pause-ratio"), 0.000001d);
-        assertEquals(runningTimeRatio, metrics.currentMetricValue(group.metricGroup(), "running-ratio"), 0.000001d);
+        assertEquals(pauseTimeRatio, metrics.currentMetricValueAsDouble(group.metricGroup(), "pause-ratio"), 0.000001d);
+        assertEquals(runningTimeRatio, metrics.currentMetricValueAsDouble(group.metricGroup(), "running-ratio"), 0.000001d);
     }
 
     private static abstract class TestSinkTask extends SinkTask {