You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2017/09/27 05:23:43 UTC

[2/2] kafka git commit: KAFKA-5900: Add task metrics common to both sink and source tasks

KAFKA-5900: Add task metrics common to both sink and source tasks

Added metrics that are common to both sink and source tasks.

Marked as "**WIP**" since this PR is built upon #3864, and will need to be rebased once that has been merged into `trunk`. However, I would still appreciate initial reviews since this PR is largely additive.

Author: Randall Hauch <rh...@gmail.com>

Reviewers: Konstantine Karantasis <ko...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #3911 from rhauch/kafka-5900


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/73cc4166
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/73cc4166
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/73cc4166

Branch: refs/heads/trunk
Commit: 73cc416664dbc8e1442f70cb3c4cd8f4d365ea50
Parents: 8256f88
Author: Randall Hauch <rh...@gmail.com>
Authored: Tue Sep 26 22:23:37 2017 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Sep 26 22:23:37 2017 -0700

----------------------------------------------------------------------
 .../kafka/common/metrics/stats/Frequencies.java | 192 ++++++++++++++++
 .../kafka/common/metrics/stats/Frequency.java   |  59 +++++
 .../kafka/common/metrics/stats/Histogram.java   | 115 +++++++---
 .../kafka/common/metrics/stats/Percentiles.java |   1 +
 .../common/metrics/stats/FrequenciesTest.java   | 159 ++++++++++++++
 .../common/metrics/stats/HistogramTest.java     |  86 +++++++-
 .../kafka/connect/runtime/ConnectMetrics.java   | 202 +++++++++++++++--
 .../kafka/connect/runtime/StateTracker.java     | 173 +++++++++++++++
 .../apache/kafka/connect/runtime/Worker.java    |   4 +-
 .../kafka/connect/runtime/WorkerConnector.java  |   7 +-
 .../kafka/connect/runtime/WorkerSinkTask.java   |  13 +-
 .../kafka/connect/runtime/WorkerSourceTask.java |  22 +-
 .../kafka/connect/runtime/WorkerTask.java       | 220 ++++++++++++++++++-
 .../connect/runtime/ConnectMetricsTest.java     |  52 +++++
 .../connect/runtime/MockConnectMetrics.java     |  42 ++++
 .../connect/runtime/MockConnectorMetrics.java   |  42 ----
 .../kafka/connect/runtime/StateTrackerTest.java | 100 +++++++++
 .../connect/runtime/WorkerConnectorTest.java    |   2 +-
 .../connect/runtime/WorkerSinkTaskTest.java     |  47 +++-
 .../runtime/WorkerSinkTaskThreadedTest.java     |  12 +-
 .../connect/runtime/WorkerSourceTaskTest.java   |  12 +-
 .../kafka/connect/runtime/WorkerTaskTest.java   | 159 +++++++++++++-
 .../kafka/connect/runtime/WorkerTest.java       |  11 +
 23 files changed, 1607 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/clients/src/main/java/org/apache/kafka/common/metrics/stats/Frequencies.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Frequencies.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Frequencies.java
new file mode 100644
index 0000000..52178d3
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Frequencies.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.CompoundStat;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.stats.Histogram.BinScheme;
+import org.apache.kafka.common.metrics.stats.Histogram.ConstantBinScheme;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A {@link CompoundStat} that represents a normalized distribution with a {@link Frequency} metric for each
+ * bucketed value. The values of the {@link Frequency} metrics specify the frequency of the center value appearing
+ * relative to the total number of values recorded.
+ * <p>
+ * For example, consider a component that records failure or success of an operation using boolean values, with
+ * one metric to capture the percentage of operations that failed another to capture the percentage of operations
+ * that succeeded.
+ * <p>
+ * This can be accomplish by created a {@link org.apache.kafka.common.metrics.Sensor Sensor} to record the values,
+ * with 0.0 for false and 1.0 for true. Then, create a single {@link Frequencies} object that has two
+ * {@link Frequency} metrics: one centered around 0.0 and another centered around 1.0. The {@link Frequencies}
+ * object is a {@link CompoundStat}, and so it can be {@link org.apache.kafka.common.metrics.Sensor#add(CompoundStat)
+ * added directly to a Sensor} so the metrics are created automatically.
+ */
+public class Frequencies extends SampledStat implements CompoundStat {
+
+    /**
+     * Create a Frequencies instance with metrics for the frequency of a boolean sensor that records 0.0 for
+     * false and 1.0 for true.
+     *
+     * @param falseMetricName the name of the metric capturing the frequency of failures; may be null if not needed
+     * @param trueMetricName  the name of the metric capturing the frequency of successes; may be null if not needed
+     * @return the Frequencies instance; never null
+     * @throws IllegalArgumentException if both {@code falseMetricName} and {@code trueMetricName} are null
+     */
+    public static Frequencies forBooleanValues(MetricName falseMetricName, MetricName trueMetricName) {
+        List<Frequency> frequencies = new ArrayList<>();
+        if (falseMetricName != null) {
+            frequencies.add(new Frequency(falseMetricName, 0.0));
+        }
+        if (trueMetricName != null) {
+            frequencies.add(new Frequency(trueMetricName, 1.0));
+        }
+        if (frequencies.isEmpty()) {
+            throw new IllegalArgumentException("Must specify at least one metric name");
+        }
+        Frequency[] frequencyArray = frequencies.toArray(new Frequency[frequencies.size()]);
+        return new Frequencies(2, 0.0, 1.0, frequencyArray);
+    }
+
+    private final Frequency[] frequencies;
+    private final BinScheme binScheme;
+
+    /**
+     * Create a Frequencies that captures the values in the specified range into the given number of buckets,
+     * where the buckets are centered around the minimum, maximum, and intermediate values.
+     *
+     * @param buckets     the number of buckets
+     * @param min         the minimum value to be captured
+     * @param max         the maximum value to be captured
+     * @param frequencies the list of {@link Frequency} metrics, which at most should be one per bucket centered
+     *                    on the bucket's value, though not every bucket need to correspond to a metric if the
+     *                    value is not needed
+     * @throws IllegalArgumentException if any of the {@link Frequency} objects do not have a
+     *                                  {@link Frequency#centerValue() center value} within the specified range
+     */
+    public Frequencies(int buckets, double min, double max, Frequency... frequencies) {
+        super(0.0); // initial value is unused by this implementation
+        if (max < min) {
+            throw new IllegalArgumentException("The maximum value " + max
+                                                       + " must be greater than the minimum value " + min);
+        }
+        if (buckets < 1) {
+            throw new IllegalArgumentException("Must be at least 2 buckets");
+        }
+        if (buckets < frequencies.length) {
+            throw new IllegalArgumentException("More frequencies than buckets");
+        }
+        this.frequencies = frequencies;
+        for (Frequency freq : frequencies) {
+            if (min > freq.centerValue() || max < freq.centerValue()) {
+                throw new IllegalArgumentException("The frequency centered at '" + freq.centerValue()
+                                                           + "' is not within the range [" + min + "," + max + "]");
+            }
+        }
+        double halfBucketWidth = (max - min) / (buckets - 1) / 2.0;
+        this.binScheme = new ConstantBinScheme(buckets, min - halfBucketWidth, max + halfBucketWidth);
+    }
+
+    @Override
+    public List<NamedMeasurable> stats() {
+        List<NamedMeasurable> ms = new ArrayList<>(frequencies.length);
+        for (Frequency frequency : frequencies) {
+            final double center = frequency.centerValue();
+            ms.add(new NamedMeasurable(frequency.name(), new Measurable() {
+                public double measure(MetricConfig config, long now) {
+                    return frequency(config, now, center);
+                }
+            }));
+        }
+        return ms;
+    }
+
+    /**
+     * Return the computed frequency describing the number of occurrences of the values in the bucket for the given
+     * center point, relative to the total number of occurrences in the samples.
+     *
+     * @param config      the metric configuration
+     * @param now         the current time in milliseconds
+     * @param centerValue the value corresponding to the center point of the bucket
+     * @return the frequency of the values in the bucket relative to the total number of samples
+     */
+    public double frequency(MetricConfig config, long now, double centerValue) {
+        purgeObsoleteSamples(config, now);
+        long totalCount = 0;
+        for (Sample sample : samples) {
+            totalCount += sample.eventCount;
+        }
+        if (totalCount == 0) {
+            return 0.0d;
+        }
+        // Add up all of the counts in the bin corresponding to the center value
+        float count = 0.0f;
+        int binNum = binScheme.toBin(centerValue);
+        for (Sample s : samples) {
+            HistogramSample sample = (HistogramSample) s;
+            float[] hist = sample.histogram.counts();
+            count += hist[binNum];
+        }
+        // Compute the ratio of counts to total counts
+        return count / (double) totalCount;
+    }
+
+    double totalCount() {
+        long count = 0;
+        for (Sample sample : samples) {
+            count += sample.eventCount;
+        }
+        return count;
+    }
+
+    @Override
+    public double combine(List<Sample> samples, MetricConfig config, long now) {
+        return totalCount();
+    }
+
+    @Override
+    protected HistogramSample newSample(long timeMs) {
+        return new HistogramSample(binScheme, timeMs);
+    }
+
+    @Override
+    protected void update(Sample sample, MetricConfig config, double value, long timeMs) {
+        HistogramSample hist = (HistogramSample) sample;
+        hist.histogram.record(value);
+    }
+
+    private static class HistogramSample extends SampledStat.Sample {
+
+        private final Histogram histogram;
+
+        private HistogramSample(BinScheme scheme, long now) {
+            super(0.0, now);
+            histogram = new Histogram(scheme);
+        }
+
+        @Override
+        public void reset(long now) {
+            super.reset(now);
+            histogram.clear();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/clients/src/main/java/org/apache/kafka/common/metrics/stats/Frequency.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Frequency.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Frequency.java
new file mode 100644
index 0000000..116d0c2
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Frequency.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+
+import org.apache.kafka.common.MetricName;
+
+/**
+ * Definition of a frequency metric used in a {@link Frequencies} compound statistic.
+ */
+public class Frequency {
+
+    private final MetricName name;
+    private final double centerValue;
+
+    /**
+     * Create an instance with the given name and center point value.
+     *
+     * @param name        the name of the frequency metric; may not be null
+     * @param centerValue the value identifying the {@link Frequencies} bucket to be reported
+     */
+    public Frequency(MetricName name, double centerValue) {
+        this.name = name;
+        this.centerValue = centerValue;
+    }
+
+    /**
+     * Get the name of this metric.
+     *
+     * @return the metric name; never null
+     */
+    public MetricName name() {
+        return this.name;
+    }
+
+    /**
+     * Get the value of this metrics center point.
+     *
+     * @return the center point value
+     */
+    public double centerValue() {
+        return this.centerValue;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/clients/src/main/java/org/apache/kafka/common/metrics/stats/Histogram.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Histogram.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Histogram.java
index 3b1426e..af2b064 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Histogram.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Histogram.java
@@ -30,12 +30,16 @@ public class Histogram {
 
     public void record(double value) {
         this.hist[binScheme.toBin(value)] += 1.0f;
-        this.count += 1.0f;
+        this.count += 1.0d;
     }
 
     public double value(double quantile) {
         if (count == 0.0d)
             return Double.NaN;
+        if (quantile > 1.00d)
+            return Float.POSITIVE_INFINITY;
+        if (quantile < 0.00d)
+            return Float.NEGATIVE_INFINITY;
         float sum = 0.0f;
         float quant = (float) quantile;
         for (int i = 0; i < this.hist.length - 1; i++) {
@@ -43,7 +47,7 @@ public class Histogram {
             if (sum / count > quant)
                 return binScheme.fromBin(i);
         }
-        return Float.POSITIVE_INFINITY;
+        return binScheme.fromBin(this.hist.length - 1);
     }
 
     public float[] counts() {
@@ -67,32 +71,70 @@ public class Histogram {
         }
         b.append(Float.POSITIVE_INFINITY);
         b.append(':');
-        b.append(this.hist[this.hist.length - 1]);
+        b.append(String.format("%.0f", this.hist[this.hist.length - 1]));
         b.append('}');
         return b.toString();
     }
 
+    /**
+     * An algorithm for determining the bin in which a value is to be placed as well as calculating the upper end
+     * of each bin.
+     */
     public interface BinScheme {
-        public int bins();
 
-        public int toBin(double value);
-
-        public double fromBin(int bin);
+        /**
+         * Get the number of bins.
+         *
+         * @return the number of bins
+         */
+        int bins();
+
+        /**
+         * Determine the 0-based bin number in which the supplied value should be placed.
+         *
+         * @param value the value
+         * @return the 0-based index of the bin
+         */
+        int toBin(double value);
+
+        /**
+         * Determine the value at the upper range of the specified bin.
+         *
+         * @param bin the 0-based bin number
+         * @return the value at the upper end of the bin; or {@link Float#NEGATIVE_INFINITY negative infinity}
+         * if the bin number is negative or {@link Float#POSITIVE_INFINITY positive infinity} if the 0-based
+         * bin number is greater than or equal to the {@link #bins() number of bins}.
+         */
+        double fromBin(int bin);
     }
 
+    /**
+     * A scheme for calculating the bins where the width of each bin is a constant determined by the range of values
+     * and the number of bins.
+     */
     public static class ConstantBinScheme implements BinScheme {
+        private static final int MIN_BIN_NUMBER = 0;
         private final double min;
         private final double max;
         private final int bins;
         private final double bucketWidth;
-
+        private final int maxBinNumber;
+
+        /**
+         * Create a bin scheme with the specified number of bins that all have the same width.
+         *
+         * @param bins the number of bins; must be at least 2
+         * @param min the minimum value to be counted in the bins
+         * @param max the maximum value to be counted in the bins
+         */
         public ConstantBinScheme(int bins, double min, double max) {
             if (bins < 2)
                 throw new IllegalArgumentException("Must have at least 2 bins.");
             this.min = min;
             this.max = max;
             this.bins = bins;
-            this.bucketWidth = (max - min) / (bins - 2);
+            this.bucketWidth = (max - min) / bins;
+            this.maxBinNumber = bins - 1;
         }
 
         public int bins() {
@@ -100,33 +142,49 @@ public class Histogram {
         }
 
         public double fromBin(int b) {
-            if (b == 0)
-                return Double.NEGATIVE_INFINITY;
-            else if (b == bins - 1)
-                return Double.POSITIVE_INFINITY;
-            else
-                return min + (b - 1) * bucketWidth;
+            if (b < MIN_BIN_NUMBER) {
+                return Float.NEGATIVE_INFINITY;
+            }
+            if (b > maxBinNumber) {
+                return Float.POSITIVE_INFINITY;
+            }
+            return min + b * bucketWidth;
         }
 
         public int toBin(double x) {
-            if (x < min)
-                return 0;
-            else if (x > max)
-                return bins - 1;
-            else
-                return (int) ((x - min) / bucketWidth) + 1;
+            int binNumber = (int) ((x - min) / bucketWidth);
+            if (binNumber < MIN_BIN_NUMBER) {
+                return MIN_BIN_NUMBER;
+            }
+            if (binNumber > maxBinNumber) {
+                return maxBinNumber;
+            }
+            return binNumber;
         }
     }
 
+    /**
+     * A scheme for calculating the bins where the width of each bin is one more than the previous bin, and therefore
+     * the bin widths are increasing at a linear rate. However, the bin widths are scaled such that the specified range
+     * of values will all fit within the bins (e.g., the upper range of the last bin is equal to the maximum value).
+     */
     public static class LinearBinScheme implements BinScheme {
         private final int bins;
         private final double max;
         private final double scale;
 
+        /**
+         * Create a linear bin scheme with the specified number of bins and the maximum value to be counted in the bins.
+         *
+         * @param numBins the number of bins; must be at least 2
+         * @param max the maximum value to be counted in the bins
+         */
         public LinearBinScheme(int numBins, double max) {
+            if (numBins < 2)
+                throw new IllegalArgumentException("Must have at least 2 bins.");
             this.bins = numBins;
             this.max = max;
-            int denom = numBins * (numBins - 1) / 2;
+            double denom = numBins * (numBins - 1.0) / 2.0;
             this.scale = max / denom;
         }
 
@@ -135,11 +193,12 @@ public class Histogram {
         }
 
         public double fromBin(int b) {
-            if (b == this.bins - 1) {
+            if (b > this.bins - 1) {
                 return Float.POSITIVE_INFINITY;
+            } else if (b < 0.0000d) {
+                return Float.NEGATIVE_INFINITY;
             } else {
-                double unscaled = (b * (b + 1.0)) / 2.0;
-                return unscaled * this.scale;
+                return this.scale * (b * (b + 1.0)) / 2.0;
             }
         }
 
@@ -149,10 +208,8 @@ public class Histogram {
             } else if (x > this.max) {
                 return this.bins - 1;
             } else {
-                double scaled = x / this.scale;
-                return (int) (-0.5 + Math.sqrt(2.0 * scaled + 0.25));
+                return (int) (-0.5 + 0.5 * Math.sqrt(1.0 + 8.0 * x / this.scale));
             }
         }
     }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
index 80236e9..e970f48 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
@@ -93,6 +93,7 @@ public class Percentiles extends SampledStat implements CompoundStat {
         return Double.POSITIVE_INFINITY;
     }
 
+    @Override
     public double combine(List<Sample> samples, MetricConfig config, long now) {
         return value(config, now, 0.5);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
new file mode 100644
index 0000000..9db18e2
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.CompoundStat.NamedMeasurable;
+import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+
+public class FrequenciesTest {
+
+    private static final double DELTA = 0.0001d;
+    private MetricConfig config;
+    private Time time;
+    private Metrics metrics;
+
+    @Before
+    public void setup() {
+        config = new MetricConfig().eventWindow(50).samples(2);
+        time = new MockTime();
+        metrics = new Metrics(config, Arrays.asList((MetricsReporter) new JmxReporter()), time, true);
+    }
+
+    @After
+    public void tearDown() {
+        metrics.close();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testFrequencyCenterValueAboveMax() {
+        new Frequencies(4, 1.0, 4.0,
+                        freq("1", 1.0), freq("2", 20.0));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testFrequencyCenterValueBelowMin() {
+        new Frequencies(4, 1.0, 4.0,
+                        freq("1", 1.0), freq("2", -20.0));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testMoreFrequencyParametersThanBuckets() {
+        new Frequencies(1, 1.0, 4.0,
+                        freq("1", 1.0), freq("2", -20.0));
+    }
+
+    @Test
+    public void testBooleanFrequencies() {
+        MetricName metricTrue = name("true");
+        MetricName metricFalse = name("false");
+        Frequencies frequencies = Frequencies.forBooleanValues(metricFalse, metricTrue);
+        final NamedMeasurable falseMetric = frequencies.stats().get(0);
+        final NamedMeasurable trueMetric = frequencies.stats().get(1);
+
+        // Record 2 windows worth of values
+        for (int i = 0; i != 25; ++i) {
+            frequencies.record(config, 0.0, time.milliseconds());
+        }
+        for (int i = 0; i != 75; ++i) {
+            frequencies.record(config, 1.0, time.milliseconds());
+        }
+        assertEquals(0.25, falseMetric.stat().measure(config, time.milliseconds()), DELTA);
+        assertEquals(0.75, trueMetric.stat().measure(config, time.milliseconds()), DELTA);
+
+        // Record 2 more windows worth of values
+        for (int i = 0; i != 40; ++i) {
+            frequencies.record(config, 0.0, time.milliseconds());
+        }
+        for (int i = 0; i != 60; ++i) {
+            frequencies.record(config, 1.0, time.milliseconds());
+        }
+        assertEquals(0.40, falseMetric.stat().measure(config, time.milliseconds()), DELTA);
+        assertEquals(0.60, trueMetric.stat().measure(config, time.milliseconds()), DELTA);
+    }
+
+    @Test
+    public void testUseWithMetrics() {
+        MetricName name1 = name("1");
+        MetricName name2 = name("2");
+        MetricName name3 = name("3");
+        MetricName name4 = name("4");
+        Frequencies frequencies = new Frequencies(4, 1.0, 4.0,
+                                                  new Frequency(name1, 1.0),
+                                                  new Frequency(name2, 2.0),
+                                                  new Frequency(name3, 3.0),
+                                                  new Frequency(name4, 4.0));
+        Sensor sensor = metrics.sensor("test", config);
+        sensor.add(frequencies);
+        Metric metric1 = this.metrics.metrics().get(name1);
+        Metric metric2 = this.metrics.metrics().get(name2);
+        Metric metric3 = this.metrics.metrics().get(name3);
+        Metric metric4 = this.metrics.metrics().get(name4);
+
+        // Record 2 windows worth of values
+        for (int i = 0; i != 100; ++i) {
+            frequencies.record(config, i % 4 + 1, time.milliseconds());
+        }
+        assertEquals(0.25, metric1.value(), DELTA);
+        assertEquals(0.25, metric2.value(), DELTA);
+        assertEquals(0.25, metric3.value(), DELTA);
+        assertEquals(0.25, metric4.value(), DELTA);
+
+        // Record 2 windows worth of values
+        for (int i = 0; i != 100; ++i) {
+            frequencies.record(config, i % 2 + 1, time.milliseconds());
+        }
+        assertEquals(0.50, metric1.value(), DELTA);
+        assertEquals(0.50, metric2.value(), DELTA);
+        assertEquals(0.00, metric3.value(), DELTA);
+        assertEquals(0.00, metric4.value(), DELTA);
+
+        // Record 1 window worth of values to overlap with the last window
+        // that is half 1.0 and half 2.0
+        for (int i = 0; i != 50; ++i) {
+            frequencies.record(config, 4.0, time.milliseconds());
+        }
+        assertEquals(0.25, metric1.value(), DELTA);
+        assertEquals(0.25, metric2.value(), DELTA);
+        assertEquals(0.00, metric3.value(), DELTA);
+        assertEquals(0.50, metric4.value(), DELTA);
+    }
+
+    protected MetricName name(String metricName) {
+        return new MetricName(metricName, "group-id", "desc", Collections.<String, String>emptyMap());
+    }
+
+    protected Frequency freq(String name, double value) {
+        return new Frequency(name(name), value);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/clients/src/test/java/org/apache/kafka/common/metrics/stats/HistogramTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/stats/HistogramTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/stats/HistogramTest.java
index aba5e7a..473a33e 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/stats/HistogramTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/stats/HistogramTest.java
@@ -33,12 +33,12 @@ public class HistogramTest {
 
     @Test
     public void testHistogram() {
-        BinScheme scheme = new ConstantBinScheme(12, -5, 5);
+        BinScheme scheme = new ConstantBinScheme(10, -5, 5);
         Histogram hist = new Histogram(scheme);
         for (int i = -5; i < 5; i++)
             hist.record(i);
         for (int i = 0; i < 10; i++)
-            assertEquals(scheme.fromBin(i + 1), hist.value(i / 10.0 + EPS), EPS);
+            assertEquals(scheme.fromBin(i), hist.value(i / 10.0 + EPS), EPS);
     }
 
     @Test
@@ -46,16 +46,90 @@ public class HistogramTest {
         ConstantBinScheme scheme = new ConstantBinScheme(5, -5, 5);
         assertEquals("A value below the lower bound should map to the first bin", 0, scheme.toBin(-5.01));
         assertEquals("A value above the upper bound should map to the last bin", 4, scheme.toBin(5.01));
-        assertEquals("Check boundary of bucket 1", 1, scheme.toBin(-5));
-        assertEquals("Check boundary of bucket 4", 4, scheme.toBin(5));
-        assertEquals("Check boundary of bucket 3", 3, scheme.toBin(4.9999));
-        checkBinningConsistency(new ConstantBinScheme(4, 0, 5));
+        assertEquals("Check boundary of bucket 0", 0, scheme.toBin(-5.0001));
+        assertEquals("Check boundary of bucket 0", 0, scheme.toBin(-5.0000));
+        assertEquals("Check boundary of bucket 0", 0, scheme.toBin(-4.99999));
+        assertEquals("Check boundary of bucket 0", 0, scheme.toBin(-3.00001));
+        assertEquals("Check boundary of bucket 1", 1, scheme.toBin(-3));
+        assertEquals("Check boundary of bucket 1", 1, scheme.toBin(-1.00001));
+        assertEquals("Check boundary of bucket 2", 2, scheme.toBin(-1));
+        assertEquals("Check boundary of bucket 2", 2, scheme.toBin(0.99999));
+        assertEquals("Check boundary of bucket 3", 3, scheme.toBin(1));
+        assertEquals("Check boundary of bucket 3", 3, scheme.toBin(2.99999));
+        assertEquals("Check boundary of bucket 4", 4, scheme.toBin(3));
+        assertEquals("Check boundary of bucket 4", 4, scheme.toBin(4.9999));
+        assertEquals("Check boundary of bucket 4", 4, scheme.toBin(5.000));
+        assertEquals("Check boundary of bucket 4", 4, scheme.toBin(5.001));
+        assertEquals(Float.NEGATIVE_INFINITY, scheme.fromBin(-1), 0.001d);
+        assertEquals(Float.POSITIVE_INFINITY, scheme.fromBin(5), 0.001d);
+        assertEquals(-5.0, scheme.fromBin(0), 0.001d);
+        assertEquals(-3.0, scheme.fromBin(1), 0.001d);
+        assertEquals(-1.0, scheme.fromBin(2), 0.001d);
+        assertEquals(1.0, scheme.fromBin(3), 0.001d);
+        assertEquals(3.0, scheme.fromBin(4), 0.001d);
+        checkBinningConsistency(scheme);
+    }
+
+    @Test
+    public void testConstantBinSchemeWithPositiveRange() {
+        ConstantBinScheme scheme = new ConstantBinScheme(5, 0, 5);
+        assertEquals("A value below the lower bound should map to the first bin", 0, scheme.toBin(-1.0));
+        assertEquals("A value above the upper bound should map to the last bin", 4, scheme.toBin(5.01));
+        assertEquals("Check boundary of bucket 0", 0, scheme.toBin(-0.0001));
+        assertEquals("Check boundary of bucket 0", 0, scheme.toBin(0.0000));
+        assertEquals("Check boundary of bucket 0", 0, scheme.toBin(0.0001));
+        assertEquals("Check boundary of bucket 0", 0, scheme.toBin(0.9999));
+        assertEquals("Check boundary of bucket 1", 1, scheme.toBin(1.0000));
+        assertEquals("Check boundary of bucket 1", 1, scheme.toBin(1.0001));
+        assertEquals("Check boundary of bucket 1", 1, scheme.toBin(1.9999));
+        assertEquals("Check boundary of bucket 2", 2, scheme.toBin(2.0000));
+        assertEquals("Check boundary of bucket 2", 2, scheme.toBin(2.0001));
+        assertEquals("Check boundary of bucket 2", 2, scheme.toBin(2.9999));
+        assertEquals("Check boundary of bucket 3", 3, scheme.toBin(3.0000));
+        assertEquals("Check boundary of bucket 3", 3, scheme.toBin(3.0001));
+        assertEquals("Check boundary of bucket 3", 3, scheme.toBin(3.9999));
+        assertEquals("Check boundary of bucket 4", 4, scheme.toBin(4.0000));
+        assertEquals("Check boundary of bucket 4", 4, scheme.toBin(4.9999));
+        assertEquals("Check boundary of bucket 4", 4, scheme.toBin(5.0000));
+        assertEquals("Check boundary of bucket 4", 4, scheme.toBin(5.0001));
+        assertEquals(Float.NEGATIVE_INFINITY, scheme.fromBin(-1), 0.001d);
+        assertEquals(Float.POSITIVE_INFINITY, scheme.fromBin(5), 0.001d);
+        assertEquals(0.0, scheme.fromBin(0), 0.001d);
+        assertEquals(1.0, scheme.fromBin(1), 0.001d);
+        assertEquals(2.0, scheme.fromBin(2), 0.001d);
+        assertEquals(3.0, scheme.fromBin(3), 0.001d);
+        assertEquals(4.0, scheme.fromBin(4), 0.001d);
         checkBinningConsistency(scheme);
     }
 
     @Test
     public void testLinearBinScheme() {
         LinearBinScheme scheme = new LinearBinScheme(10, 10);
+        assertEquals(Float.NEGATIVE_INFINITY, scheme.fromBin(-1), 0.001d);
+        assertEquals(Float.POSITIVE_INFINITY, scheme.fromBin(11), 0.001d);
+        assertEquals(0.0, scheme.fromBin(0), 0.001d);
+        assertEquals(0.2222, scheme.fromBin(1), 0.001d);
+        assertEquals(0.6666, scheme.fromBin(2), 0.001d);
+        assertEquals(1.3333, scheme.fromBin(3), 0.001d);
+        assertEquals(2.2222, scheme.fromBin(4), 0.001d);
+        assertEquals(3.3333, scheme.fromBin(5), 0.001d);
+        assertEquals(4.6667, scheme.fromBin(6), 0.001d);
+        assertEquals(6.2222, scheme.fromBin(7), 0.001d);
+        assertEquals(8.0000, scheme.fromBin(8), 0.001d);
+        assertEquals(10.000, scheme.fromBin(9), 0.001d);
+        assertEquals(0, scheme.toBin(0.0000));
+        assertEquals(0, scheme.toBin(0.2221));
+        assertEquals(1, scheme.toBin(0.2223));
+        assertEquals(2, scheme.toBin(0.6667));
+        assertEquals(3, scheme.toBin(1.3334));
+        assertEquals(4, scheme.toBin(2.2223));
+        assertEquals(5, scheme.toBin(3.3334));
+        assertEquals(6, scheme.toBin(4.6667));
+        assertEquals(7, scheme.toBin(6.2223));
+        assertEquals(8, scheme.toBin(8.0000));
+        assertEquals(9, scheme.toBin(10.000));
+        assertEquals(9, scheme.toBin(10.001));
+        assertEquals(Float.POSITIVE_INFINITY, scheme.fromBin(10), 0.001d);
         checkBinningConsistency(scheme);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
index 681a398..8c8ba6d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
@@ -30,10 +30,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
-import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
@@ -51,7 +53,7 @@ public class ConnectMetrics {
     private final Metrics metrics;
     private final Time time;
     private final String workerId;
-    private final ConcurrentMap<String, MetricGroup> groupsByName = new ConcurrentHashMap<>();
+    private final ConcurrentMap<MetricGroupId, MetricGroup> groupsByName = new ConcurrentHashMap<>();
 
     /**
      * Create an instance.
@@ -107,6 +109,7 @@ public class ConnectMetrics {
 
     /**
      * Get or create a {@link MetricGroup} with the specified group name and the given tags.
+     * Each group is uniquely identified by the name and tags.
      *
      * @param groupName    the name of the metric group; may not be null and must be a
      *                     {@link #checkNameIsValid(String) valid name}
@@ -120,6 +123,7 @@ public class ConnectMetrics {
 
     /**
      * Get or create a {@link MetricGroup} with the specified group name and the given tags.
+     * Each group is uniquely identified by the name and tags.
      *
      * @param groupName       the name of the metric group; may not be null and must be a
      *                        {@link #checkNameIsValid(String) valid name}
@@ -129,16 +133,22 @@ public class ConnectMetrics {
      * @throws IllegalArgumentException if the group name is not valid
      */
     public MetricGroup group(String groupName, boolean includeWorkerId, String... tagKeyValues) {
-        MetricGroup group = groupsByName.get(groupName);
+        MetricGroupId groupId = groupId(groupName, includeWorkerId, tagKeyValues);
+        MetricGroup group = groupsByName.get(groupId);
         if (group == null) {
-            Map<String, String> tags = tags(includeWorkerId ? workerId : null, tagKeyValues);
-            group = new MetricGroup(groupName, tags);
-            MetricGroup previous = groupsByName.putIfAbsent(groupName, group);
+            group = new MetricGroup(groupId);
+            MetricGroup previous = groupsByName.putIfAbsent(groupId, group);
             if (previous != null) group = previous;
         }
         return group;
     }
 
+    protected MetricGroupId groupId(String groupName, boolean includeWorkerId, String... tagKeyValues) {
+        checkNameIsValid(groupName);
+        Map<String, String> tags = tags(includeWorkerId ? workerId : null, tagKeyValues);
+        return new MetricGroupId(groupName, tags);
+    }
+
     /**
      * Get the time.
      *
@@ -157,24 +167,94 @@ public class ConnectMetrics {
         AppInfoParser.unregisterAppInfo(JMX_PREFIX, workerId);
     }
 
+    public static class MetricGroupId {
+        private final String groupName;
+        private final Map<String, String> tags;
+        private final int hc;
+        private final String str;
+
+        public MetricGroupId(String groupName, Map<String, String> tags) {
+            assert groupName != null;
+            assert tags != null;
+            this.groupName = groupName;
+            this.tags = Collections.unmodifiableMap(new LinkedHashMap<>(tags));
+            this.hc = Objects.hash(this.groupName, this.tags);
+            StringBuilder sb = new StringBuilder(this.groupName);
+            for (Map.Entry<String, String> entry : this.tags.entrySet()) {
+                sb.append(";").append(entry.getKey()).append('=').append(entry.getValue());
+            }
+            this.str = sb.toString();
+        }
+
+        /**
+         * Get the group name.
+         *
+         * @return the group name; never null
+         */
+        public String groupName() {
+            return groupName;
+        }
+
+        /**
+         * Get the immutable map of tag names and values.
+         *
+         * @return the tags; never null
+         */
+        public Map<String, String> tags() {
+            return tags;
+        }
+
+        /**
+         * Determine if the supplied metric name is part of this group identifier.
+         *
+         * @param metricName the metric name
+         * @return true if the metric name's group and tags match this group identifier, or false otherwise
+         */
+        public boolean includes(MetricName metricName) {
+            return metricName != null && groupName.equals(metricName.group()) && tags.equals(metricName.tags());
+        }
+
+        @Override
+        public int hashCode() {
+            return hc;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj == this) return true;
+            if (obj instanceof MetricGroupId) {
+                MetricGroupId that = (MetricGroupId) obj;
+                return this.groupName.equals(that.groupName) && this.tags.equals(that.tags);
+            }
+            return false;
+        }
+
+        @Override
+        public String toString() {
+            return str;
+        }
+    }
+
     /**
      * A group of metrics. Each group maps to a JMX MBean and each metric maps to an MBean attribute.
+     * <p>
+     * Sensors should be added via the {@code sensor} methods on this class, rather than directly through
+     * the {@link Metrics} class, so that the sensor names are made to be unique (based on the group name)
+     * and so the sensors are removed when this group is {@link #close() closed}.
      */
     public class MetricGroup {
-        private final String groupName;
-        private final Map<String, String> tags;
+        private final MetricGroupId groupId;
+        private final Set<String> sensorNames = new HashSet<>();
+        private final String sensorPrefix;
 
         /**
          * Create a group of Connect metrics.
          *
-         * @param groupName the name of the group; may not be null and must be valid
-         * @param tags      the tags; may not be null but may be empty
-         * @throws IllegalArgumentException if the name is not valid
+         * @param groupId the identifier of the group; may not be null and must be valid
          */
-        protected MetricGroup(String groupName, Map<String, String> tags) {
-            checkNameIsValid(groupName);
-            this.groupName = groupName;
-            this.tags = Collections.unmodifiableMap(new HashMap<>(tags));
+        protected MetricGroup(MetricGroupId groupId) {
+            this.groupId = groupId;
+            sensorPrefix = "connect-sensor-group: " + groupId.toString() + ";";
         }
 
         /**
@@ -187,11 +267,15 @@ public class ConnectMetrics {
          */
         public MetricName metricName(String name, String desc) {
             checkNameIsValid(name);
-            return metrics.metricName(name, groupName, desc, tags);
+            return metrics.metricName(name, groupId.groupName(), desc, groupId.tags());
         }
 
         /**
          * The {@link Metrics} that this group belongs to.
+         * <p>
+         * Do not use this to add {@link Sensor Sensors}, since they will not be removed when this group is
+         * {@link #close() closed}. Metrics can be added directly, as long as the metric names are obtained from
+         * this group via the {@link #metricName(String, String)} method.
          *
          * @return the metrics; never null
          */
@@ -205,7 +289,7 @@ public class ConnectMetrics {
          * @return the unmodifiable tags; never null but may be empty
          */
         Map<String, String> tags() {
-            return tags;
+            return groupId.tags();
         }
 
         /**
@@ -228,6 +312,86 @@ public class ConnectMetrics {
                 });
             }
         }
+
+        /**
+         * Get or create a sensor with the given unique name and no parent sensors. This uses
+         * a default recording level of INFO.
+         *
+         * @param name The sensor name
+         * @return The sensor
+         */
+        public Sensor sensor(String name) {
+            return sensor(name, null, Sensor.RecordingLevel.INFO);
+        }
+
+        /**
+         * Get or create a sensor with the given unique name and no parent sensors. This uses
+         * a default recording level of INFO.
+         *
+         * @param name The sensor name
+         * @return The sensor
+         */
+        public Sensor sensor(String name, Sensor... parents) {
+            return sensor(name, null, Sensor.RecordingLevel.INFO, parents);
+        }
+
+        /**
+         * Get or create a sensor with the given unique name and zero or more parent sensors. All parent sensors will
+         * receive every value recorded with this sensor.
+         *
+         * @param name           The name of the sensor
+         * @param recordingLevel The recording level.
+         * @param parents        The parent sensors
+         * @return The sensor that is created
+         */
+        public Sensor sensor(String name, Sensor.RecordingLevel recordingLevel, Sensor... parents) {
+            return sensor(name, null, recordingLevel, parents);
+        }
+
+        /**
+         * Get or create a sensor with the given unique name and zero or more parent sensors. All parent sensors will
+         * receive every value recorded with this sensor.
+         *
+         * @param name    The name of the sensor
+         * @param config  A default configuration to use for this sensor for metrics that don't have their own config
+         * @param parents The parent sensors
+         * @return The sensor that is created
+         */
+        public Sensor sensor(String name, MetricConfig config, Sensor... parents) {
+            return sensor(name, config, Sensor.RecordingLevel.INFO, parents);
+        }
+
+        /**
+         * Get or create a sensor with the given unique name and zero or more parent sensors. All parent sensors will
+         * receive every value recorded with this sensor.
+         *
+         * @param name           The name of the sensor
+         * @param config         A default configuration to use for this sensor for metrics that don't have their own config
+         * @param recordingLevel The recording level.
+         * @param parents        The parent sensors
+         * @return The sensor that is created
+         */
+        public synchronized Sensor sensor(String name, MetricConfig config, Sensor.RecordingLevel recordingLevel, Sensor... parents) {
+            // We need to make sure that all sensor names are unique across all groups, so use the sensor prefix
+            Sensor result = metrics.sensor(sensorPrefix + name, config, Long.MAX_VALUE, recordingLevel, parents);
+            if (result != null) sensorNames.add(result.name());
+            return result;
+        }
+
+        /**
+         * Remove all sensors and metrics associated with this group.
+         */
+        public synchronized void close() {
+            for (String sensorName : sensorNames) {
+                metrics.removeSensor(sensorName);
+            }
+            sensorNames.clear();
+            for (MetricName metricName : new HashSet<>(metrics.metrics().keySet())) {
+                if (groupId.includes(metricName)) {
+                    metrics.removeMetric(metricName);
+                }
+            }
+        }
     }
 
     /**
@@ -245,7 +409,7 @@ public class ConnectMetrics {
 
     /**
      * Create a set of tags using the supplied key and value pairs. Every tag name and value will be
-     * {@link #makeValidName(String) made valid} before it is used.
+     * {@link #makeValidName(String) made valid} before it is used. The order of the tags will be kept.
      *
      * @param workerId the worker ID that should be included first in the tags; may be null if not to be included
      * @param keyValue the key and value pairs for the tags; must be an even number
@@ -254,7 +418,7 @@ public class ConnectMetrics {
     static Map<String, String> tags(String workerId, String... keyValue) {
         if ((keyValue.length % 2) != 0)
             throw new IllegalArgumentException("keyValue needs to be specified in pairs");
-        Map<String, String> tags = new HashMap<>();
+        Map<String, String> tags = new LinkedHashMap<>();
         if (workerId != null && !workerId.trim().isEmpty()) {
             tags.put(WORKER_ID_TAG_NAME, makeValidName(workerId));
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java
new file mode 100644
index 0000000..566a6fc
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.runtime.AbstractStatus.State;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Utility class that tracks the current state and the duration of time spent in each state.
+ * This class is threadsafe.
+ */
+public class StateTracker {
+
+    private final AtomicReference<StateChange> lastState = new AtomicReference<>(new StateChange());
+
+    /**
+     * Change the current state.
+     * <p>
+     * This method is synchronized to ensure that all state changes are captured correctly and in the same order.
+     * Synchronization is acceptable since it is assumed that state changes will be relatively infrequent.
+     *
+     * @param newState the current state; may not be null
+     * @param now      the current time in milliseconds
+     */
+    public synchronized void changeState(State newState, long now) {
+        // JDK8: remove synchronization by using lastState.getAndUpdate(oldState->oldState.newState(newState, now));
+        lastState.set(lastState.get().newState(newState, now));
+    }
+
+    /**
+     * Calculate the ratio of time spent in the specified state.
+     *
+     * @param ratioState the state for which the ratio is to be calculated; may not be null
+     * @param now        the current time in milliseconds
+     * @return the ratio of time spent in the specified state to the time spent in all states
+     */
+    public double durationRatio(State ratioState, long now) {
+        return lastState.get().durationRatio(ratioState, now);
+    }
+
+    /**
+     * Get the current state.
+     *
+     * @return the current state; may be null if no state change has been recorded
+     */
+    public State currentState() {
+        return lastState.get().state;
+    }
+
+    /**
+     * An immutable record of the accumulated times at the most recent state change. This class is required to
+     * efficiently make {@link StateTracker} threadsafe.
+     */
+    private static final class StateChange {
+
+        private final State state;
+        private final long startTime;
+        private final long unassignedTotalTimeMs;
+        private final long runningTotalTimeMs;
+        private final long pausedTotalTimeMs;
+        private final long failedTotalTimeMs;
+        private final long destroyedTotalTimeMs;
+
+        /**
+         * The initial StateChange instance before any state has changed.
+         */
+        StateChange() {
+            this(null, 0L, 0L, 0L, 0L, 0L, 0L);
+        }
+
+        StateChange(State state, long startTime, long unassignedTotalTimeMs, long runningTotalTimeMs,
+                            long pausedTotalTimeMs, long failedTotalTimeMs, long destroyedTotalTimeMs) {
+            this.state = state;
+            this.startTime = startTime;
+            this.unassignedTotalTimeMs = unassignedTotalTimeMs;
+            this.runningTotalTimeMs = runningTotalTimeMs;
+            this.pausedTotalTimeMs = pausedTotalTimeMs;
+            this.failedTotalTimeMs = failedTotalTimeMs;
+            this.destroyedTotalTimeMs = destroyedTotalTimeMs;
+        }
+
+        /**
+         * Return a new StateChange that includes the accumulated times of this state plus the time spent in the
+         * current state.
+         *
+         * @param state the new state; may not be null
+         * @param now   the time at which the state transition occurs.
+         * @return the new StateChange, though may be this instance of the state did not actually change; never null
+         */
+        public StateChange newState(State state, long now) {
+            if (this.state == null) {
+                return new StateChange(state, now, 0L, 0L, 0L, 0L, 0L);
+            }
+            if (state == this.state) {
+                return this;
+            }
+            long unassignedTime = this.unassignedTotalTimeMs;
+            long runningTime = this.runningTotalTimeMs;
+            long pausedTime = this.pausedTotalTimeMs;
+            long failedTime = this.failedTotalTimeMs;
+            long destroyedTime = this.destroyedTotalTimeMs;
+            long duration = now - startTime;
+            switch (this.state) {
+                case UNASSIGNED:
+                    unassignedTime += duration;
+                    break;
+                case RUNNING:
+                    runningTime += duration;
+                    break;
+                case PAUSED:
+                    pausedTime += duration;
+                    break;
+                case FAILED:
+                    failedTime += duration;
+                    break;
+                case DESTROYED:
+                    destroyedTime += duration;
+                    break;
+            }
+            return new StateChange(state, now, unassignedTime, runningTime, pausedTime, failedTime, destroyedTime);
+        }
+
+        /**
+         * Calculate the ratio of time spent in the specified state.
+         *
+         * @param ratioState the state for which the ratio is to be calculated; may not be null
+         * @param now        the current time in milliseconds
+         * @return the ratio of time spent in the specified state to the time spent in all states
+         */
+        public double durationRatio(State ratioState, long now) {
+            if (state == null) {
+                return 0.0d;
+            }
+            long durationCurrent = now - startTime; // since last state change
+            long durationDesired = ratioState == state ? durationCurrent : 0L;
+            switch (ratioState) {
+                case UNASSIGNED:
+                    durationDesired += unassignedTotalTimeMs;
+                    break;
+                case RUNNING:
+                    durationDesired += runningTotalTimeMs;
+                    break;
+                case PAUSED:
+                    durationDesired += pausedTotalTimeMs;
+                    break;
+                case FAILED:
+                    durationDesired += failedTotalTimeMs;
+                    break;
+                case DESTROYED:
+                    durationDesired += destroyedTotalTimeMs;
+                    break;
+            }
+            long total = durationCurrent + unassignedTotalTimeMs + runningTotalTimeMs + pausedTotalTimeMs +
+                                 failedTotalTimeMs + destroyedTotalTimeMs;
+            return (double) durationDesired / total;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 7ce9214..56bc341 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -428,10 +428,10 @@ public class Worker {
                     internalKeyConverter, internalValueConverter);
             KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
             return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter,
-                    valueConverter, transformationChain, producer, offsetReader, offsetWriter, config, loader, time);
+                    valueConverter, transformationChain, producer, offsetReader, offsetWriter, config, metrics, loader, time);
         } else if (task instanceof SinkTask) {
             TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connConfig.<SinkRecord>transformations());
-            return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, keyConverter,
+            return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, metrics, keyConverter,
                     valueConverter, transformationChain, loader, time);
         } else {
             log.error("Tasks must be a subclass of either SourceTask or SinkTask", task);

http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
index aa9cdd1..49dbf4d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
-import org.apache.kafka.connect.runtime.AbstractStatus.State;
 import org.apache.kafka.connect.runtime.ConnectMetrics.IndicatorPredicate;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.sink.SinkConnector;
@@ -171,6 +170,8 @@ public class WorkerConnector {
             log.error("{} Error while shutting down connector", this, t);
             this.state = State.FAILED;
             statusListener.onFailure(connName, t);
+        } finally {
+            metrics.close();
         }
     }
 
@@ -244,6 +245,10 @@ public class WorkerConnector {
             });
         }
 
+        public void close() {
+            metricGroup.close();
+        }
+
         @Override
         public void onStartup(String connector) {
             state = AbstractStatus.State.RUNNING;

http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 5f2b39f..8f0f7fd 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -81,12 +81,13 @@ class WorkerSinkTask extends WorkerTask {
                           TaskStatus.Listener statusListener,
                           TargetState initialState,
                           WorkerConfig workerConfig,
+                          ConnectMetrics connectMetrics,
                           Converter keyConverter,
                           Converter valueConverter,
                           TransformationChain<SinkRecord> transformationChain,
                           ClassLoader loader,
                           Time time) {
-        super(id, statusListener, initialState, loader);
+        super(id, statusListener, initialState, loader, connectMetrics);
 
         this.workerConfig = workerConfig;
         this.task = task;
@@ -137,6 +138,10 @@ class WorkerSinkTask extends WorkerTask {
     }
 
     @Override
+    protected void releaseResources() {
+    }
+
+    @Override
     public void transitionTo(TargetState state) {
         super.transitionTo(state);
         consumer.wakeup();
@@ -211,18 +216,21 @@ class WorkerSinkTask extends WorkerTask {
             log.debug("{} Received out of order commit callback for sequence number {}, but most recent sequence number is {}",
                     this, seqno, commitSeqno);
         } else {
+            long durationMillis = time.milliseconds() - commitStarted;
             if (error != null) {
                 log.error("{} Commit of offsets threw an unexpected exception for sequence number {}: {}",
                         this, seqno, committedOffsets, error);
                 commitFailures++;
+                recordCommitFailure(durationMillis, error);
             } else {
                 log.debug("{} Finished offset commit successfully in {} ms for sequence number {}: {}",
-                        this, time.milliseconds() - commitStarted, seqno, committedOffsets);
+                        this, durationMillis, seqno, committedOffsets);
                 if (committedOffsets != null) {
                     log.debug("{} Setting last committed offsets to {}", this, committedOffsets);
                     lastCommittedOffsets = committedOffsets;
                 }
                 commitFailures = 0;
+                recordCommitSuccess(durationMillis);
             }
             committing = false;
         }
@@ -466,6 +474,7 @@ class WorkerSinkTask extends WorkerTask {
             // Since we reuse the messageBatch buffer, ensure we give the task its own copy
             log.trace("{} Delivering batch of {} messages to task", this, messageBatch.size());
             task.put(new ArrayList<>(messageBatch));
+            recordBatch(messageBatch.size());
             currentOffsets.putAll(origOffsets);
             messageBatch.clear();
             // If we had paused all consumer topic partitions to try to redeliver data, then we should resume any that

http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 6a17b71..92f7789 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -86,9 +86,10 @@ class WorkerSourceTask extends WorkerTask {
                             OffsetStorageReader offsetReader,
                             OffsetStorageWriter offsetWriter,
                             WorkerConfig workerConfig,
+                            ConnectMetrics connectMetrics,
                             ClassLoader loader,
                             Time time) {
-        super(id, statusListener, initialState, loader);
+        super(id, statusListener, initialState, loader, connectMetrics);
 
         this.workerConfig = workerConfig;
         this.task = task;
@@ -124,6 +125,10 @@ class WorkerSourceTask extends WorkerTask {
     }
 
     @Override
+    protected void releaseResources() {
+    }
+
+    @Override
     public void stop() {
         super.stop();
         stopRequestedLatch.countDown();
@@ -186,6 +191,7 @@ class WorkerSourceTask extends WorkerTask {
      */
     private boolean sendRecords() {
         int processed = 0;
+        recordBatch(toSend.size());
         for (final SourceRecord preTransformRecord : toSend) {
             final SourceRecord record = transformationChain.apply(preTransformRecord);
 
@@ -303,6 +309,7 @@ class WorkerSourceTask extends WorkerTask {
                     if (timeoutMs <= 0) {
                         log.error("{} Failed to flush, timed out while waiting for producer to flush outstanding {} messages", this, outstandingMessages.size());
                         finishFailedFlush();
+                        recordCommitFailure(time.milliseconds() - started, null);
                         return false;
                     }
                     this.wait(timeoutMs);
@@ -312,6 +319,7 @@ class WorkerSourceTask extends WorkerTask {
                     // to stop immediately
                     log.error("{} Interrupted while flushing messages, offsets will not be committed", this);
                     finishFailedFlush();
+                    recordCommitFailure(time.milliseconds() - started, null);
                     return false;
                 }
             }
@@ -322,8 +330,10 @@ class WorkerSourceTask extends WorkerTask {
                 // flush time, which can be used for monitoring even if the connector doesn't record any
                 // offsets.
                 finishSuccessfulFlush();
+                long durationMillis = time.milliseconds() - started;
+                recordCommitSuccess(durationMillis);
                 log.debug("{} Finished offset commitOffsets successfully in {} ms",
-                        this, time.milliseconds() - started);
+                        this, durationMillis);
 
                 commitSourceTask();
                 return true;
@@ -345,6 +355,7 @@ class WorkerSourceTask extends WorkerTask {
         // any data
         if (flushFuture == null) {
             finishFailedFlush();
+            recordCommitFailure(time.milliseconds() - started, null);
             return false;
         }
         try {
@@ -356,20 +367,25 @@ class WorkerSourceTask extends WorkerTask {
         } catch (InterruptedException e) {
             log.warn("{} Flush of offsets interrupted, cancelling", this);
             finishFailedFlush();
+            recordCommitFailure(time.milliseconds() - started, e);
             return false;
         } catch (ExecutionException e) {
             log.error("{} Flush of offsets threw an unexpected exception: ", this, e);
             finishFailedFlush();
+            recordCommitFailure(time.milliseconds() - started, e);
             return false;
         } catch (TimeoutException e) {
             log.error("{} Timed out waiting to flush offsets to storage", this);
             finishFailedFlush();
+            recordCommitFailure(time.milliseconds() - started, null);
             return false;
         }
 
         finishSuccessfulFlush();
+        long durationMillis = time.milliseconds() - started;
+        recordCommitSuccess(durationMillis);
         log.info("{} Finished commitOffsets successfully in {} ms",
-                this, time.milliseconds() - started);
+                this, durationMillis);
 
         commitSourceTask();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index 3295434..44703dd 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -16,6 +16,17 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.AbstractStatus.State;
+import org.apache.kafka.connect.runtime.ConnectMetrics.IndicatorPredicate;
+import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.slf4j.Logger;
@@ -42,6 +53,7 @@ abstract class WorkerTask implements Runnable {
     private final TaskStatus.Listener statusListener;
     protected final ClassLoader loader;
     private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+    private final TaskMetricsGroup taskMetricsGroup;
     private volatile TargetState targetState;
     private volatile boolean stopping;   // indicates whether the Worker has asked the task to stop
     private volatile boolean cancelled;  // indicates whether the Worker has cancelled the task (e.g. because of slow shutdown)
@@ -49,13 +61,16 @@ abstract class WorkerTask implements Runnable {
     public WorkerTask(ConnectorTaskId id,
                       TaskStatus.Listener statusListener,
                       TargetState initialState,
-                      ClassLoader loader) {
+                      ClassLoader loader,
+                      ConnectMetrics connectMetrics) {
         this.id = id;
-        this.statusListener = statusListener;
+        this.taskMetricsGroup = new TaskMetricsGroup(this.id, connectMetrics, statusListener);
+        this.statusListener = taskMetricsGroup;
         this.loader = loader;
         this.targetState = initialState;
         this.stopping = false;
         this.cancelled = false;
+        this.taskMetricsGroup.recordState(this.targetState);
     }
 
     public ConnectorTaskId id() {
@@ -68,6 +83,7 @@ abstract class WorkerTask implements Runnable {
 
     /**
      * Initialize the task for execution.
+     *
      * @param taskConfig initial configuration
      */
     public abstract void initialize(TaskConfig taskConfig);
@@ -116,6 +132,12 @@ abstract class WorkerTask implements Runnable {
 
     protected abstract void close();
 
+    /**
+     * Method called when this worker task has been completely closed, and when the subclass should clean up
+     * all resources.
+     */
+    protected abstract void releaseResources();
+
     protected boolean isStopping() {
         return stopping;
     }
@@ -195,8 +217,16 @@ abstract class WorkerTask implements Runnable {
             if (t instanceof Error)
                 throw (Error) t;
         } finally {
-            Plugins.compareAndSwapLoaders(savedLoader);
-            shutdownLatch.countDown();
+            try {
+                Plugins.compareAndSwapLoaders(savedLoader);
+                shutdownLatch.countDown();
+            } finally {
+                try {
+                    releaseResources();
+                } finally {
+                    taskMetricsGroup.close();
+                }
+            }
         }
     }
 
@@ -206,6 +236,7 @@ abstract class WorkerTask implements Runnable {
 
     /**
      * Await task resumption.
+     *
      * @return true if the task's target state is not paused, false if the task is shutdown before resumption
      * @throws InterruptedException
      */
@@ -231,4 +262,183 @@ abstract class WorkerTask implements Runnable {
         }
     }
 
-}
+    /**
+     * Record that offsets have been committed.
+     *
+     * @param duration the length of time in milliseconds for the commit attempt to complete
+     */
+    protected void recordCommitSuccess(long duration) {
+        taskMetricsGroup.recordCommit(duration, true, null);
+    }
+
+    /**
+     * Record that offsets have been committed.
+     *
+     * @param duration the length of time in milliseconds for the commit attempt to complete
+     * @param error the unexpected error that occurred; may be null in the case of timeouts or interruptions
+     */
+    protected void recordCommitFailure(long duration, Throwable error) {
+        taskMetricsGroup.recordCommit(duration, false, error);
+    }
+
+    /**
+     * Record that a batch of records has been processed.
+     *
+     * @param size the number of records in the batch
+     */
+    protected void recordBatch(int size) {
+        taskMetricsGroup.recordBatch(size);
+    }
+
+    TaskMetricsGroup taskMetricsGroup() {
+        return taskMetricsGroup;
+    }
+
+    static class TaskMetricsGroup implements TaskStatus.Listener {
+        private final TaskStatus.Listener delegateListener;
+        private final MetricGroup metricGroup;
+        private final Time time;
+        private final StateTracker taskStateTimer;
+        private final Sensor commitTime;
+        private final Sensor batchSize;
+        private final Sensor commitAttempts;
+
+        public TaskMetricsGroup(ConnectorTaskId id, ConnectMetrics connectMetrics, TaskStatus.Listener statusListener) {
+            delegateListener = statusListener;
+            time = connectMetrics.time();
+            taskStateTimer = new StateTracker();
+            metricGroup = connectMetrics.group("connector-tasks",
+                    "connector", id.connector(), "task", Integer.toString(id.task()));
+
+            addTaskStateMetric(State.UNASSIGNED, "status-unassigned",
+                    "Signals whether the connector task is in the unassigned state.");
+            addTaskStateMetric(State.RUNNING, "status-running",
+                    "Signals whether the connector task is in the running state.");
+            addTaskStateMetric(State.PAUSED, "status-paused",
+                    "Signals whether the connector task is in the paused state.");
+            addTaskStateMetric(State.FAILED, "status-failed",
+                    "Signals whether the connector task is in the failed state.");
+            addTaskStateMetric(State.DESTROYED, "status-destroyed",
+                    "Signals whether the connector task is in the destroyed state.");
+
+            addRatioMetric(State.RUNNING, "running-ratio",
+                    "The fraction of time this task has spent in the running state.");
+            addRatioMetric(State.PAUSED, "pause-ratio",
+                    "The fraction of time this task has spent in the paused state.");
+
+            commitTime = metricGroup.sensor("commit-time");
+            commitTime.add(metricGroup.metricName("offset-commit-max-time-ms",
+                    "The maximum time in milliseconds taken by this task to commit offsets"),
+                    new Max());
+            commitTime.add(metricGroup.metricName("offset-commit-avg-time-ms",
+                    "The average time in milliseconds taken by this task to commit offsets"),
+                    new Avg());
+
+            batchSize = metricGroup.sensor("batch-size");
+            batchSize.add(metricGroup.metricName("batch-size-max",
+                    "The maximum size of the batches processed by the connector"),
+                    new Max());
+            batchSize.add(metricGroup.metricName("batch-size-avg",
+                    "The average size of the batches processed by the connector"),
+                    new Avg());
+
+            MetricName offsetCommitFailures = metricGroup.metricName("offset-commit-failure-percentage",
+                    "The average percentage of this task's offset commit attempts that failed");
+            MetricName offsetCommitSucceeds = metricGroup.metricName("offset-commit-success-percentage",
+                    "The average percentage of this task's offset commit attempts that failed");
+            Frequencies commitFrequencies = Frequencies.forBooleanValues(offsetCommitFailures, offsetCommitSucceeds);
+            commitAttempts = metricGroup.sensor("offset-commit-completion");
+            commitAttempts.add(commitFrequencies);
+        }
+
+        private void addTaskStateMetric(final State matchingState, String name, String description) {
+            metricGroup.addIndicatorMetric(name, description, new IndicatorPredicate() {
+                @Override
+                public boolean matches() {
+                    return matchingState == taskStateTimer.currentState();
+                }
+            });
+        }
+
+        private void addRatioMetric(final State matchingState, String name, String description) {
+            MetricName metricName = metricGroup.metricName(name, description);
+            if (metricGroup.metrics().metric(metricName) == null) {
+                metricGroup.metrics().addMetric(metricName, new Measurable() {
+                    @Override
+                    public double measure(MetricConfig config, long now) {
+                        return taskStateTimer.durationRatio(matchingState, now);
+                    }
+                });
+            }
+        }
+
+        void close() {
+            metricGroup.close();
+        }
+
+        void recordCommit(long duration, boolean success, Throwable error) {
+            if (success) {
+                commitTime.record(duration);
+                commitAttempts.record(1.0d);
+            } else {
+                commitAttempts.record(0.0d);
+            }
+        }
+
+        void recordBatch(int size) {
+            batchSize.record(size);
+        }
+
+        @Override
+        public void onStartup(ConnectorTaskId id) {
+            taskStateTimer.changeState(State.RUNNING, time.milliseconds());
+            delegateListener.onStartup(id);
+        }
+
+        @Override
+        public void onFailure(ConnectorTaskId id, Throwable cause) {
+            taskStateTimer.changeState(State.FAILED, time.milliseconds());
+            delegateListener.onFailure(id, cause);
+        }
+
+        @Override
+        public void onPause(ConnectorTaskId id) {
+            taskStateTimer.changeState(State.PAUSED, time.milliseconds());
+            delegateListener.onPause(id);
+        }
+
+        @Override
+        public void onResume(ConnectorTaskId id) {
+            taskStateTimer.changeState(State.RUNNING, time.milliseconds());
+            delegateListener.onResume(id);
+        }
+
+        @Override
+        public void onShutdown(ConnectorTaskId id) {
+            taskStateTimer.changeState(State.UNASSIGNED, time.milliseconds());
+            delegateListener.onShutdown(id);
+        }
+
+        public void recordState(TargetState state) {
+            switch (state) {
+                case STARTED:
+                    taskStateTimer.changeState(State.RUNNING, time.milliseconds());
+                    break;
+                case PAUSED:
+                    taskStateTimer.changeState(State.PAUSED, time.milliseconds());
+                    break;
+                default:
+                    break;
+            }
+        }
+
+        public State state() {
+            return taskStateTimer.currentState();
+        }
+
+        double currentMetricValue(String name) {
+            MetricName metricName = metricGroup.metricName(name, "desc");
+            return metricGroup.metrics().metric(metricName).value();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
index 34997e6..6de0638 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroupId;
 import org.apache.kafka.connect.util.MockTime;
 import org.junit.After;
 import org.junit.Before;
@@ -26,6 +27,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertSame;
@@ -141,6 +143,56 @@ public class ConnectMetricsTest {
         MetricGroup group3 = metrics.group("other");
         assertNotNull(group3);
         assertNotSame(group1, group3);
+
+        // Now with tags
+        MetricGroup group4 = metrics.group("name", "k1", "v1");
+        assertNotNull(group4);
+        assertNotSame(group1, group4);
+        assertNotSame(group2, group4);
+        assertNotSame(group3, group4);
+        MetricGroup group5 = metrics.group("name", "k1", "v1");
+        assertSame(group4, group5);
+    }
+
+    @Test
+    public void testMetricGroupIdIdentity() {
+        MetricGroupId id1 = metrics.groupId("name", false, "k1", "v1");
+        MetricGroupId id2 = metrics.groupId("name", false, "k1", "v1");
+        MetricGroupId id3 = metrics.groupId("name", false, "k1", "v1", "k2", "v2");
+
+        assertEquals(id1.hashCode(), id2.hashCode());
+        assertEquals(id1, id2);
+        assertEquals(id1.toString(), id2.toString());
+        assertEquals(id1.groupName(), id2.groupName());
+        assertEquals(id1.tags(), id2.tags());
+        assertNotNull(id1.tags());
+
+        assertNotEquals(id1, id3);
+    }
+
+    @Test
+    public void testMetricGroupIdWithoutTags() {
+        MetricGroupId id1 = metrics.groupId("name", false);
+        MetricGroupId id2 = metrics.groupId("name", false);
+
+        assertEquals(id1.hashCode(), id2.hashCode());
+        assertEquals(id1, id2);
+        assertEquals(id1.toString(), id2.toString());
+        assertEquals(id1.groupName(), id2.groupName());
+        assertEquals(id1.tags(), id2.tags());
+        assertNotNull(id1.tags());
+        assertNotNull(id2.tags());
+    }
+
+    @Test
+    public void testMetricGroupIdWithWorkerId() {
+        MetricGroupId id1 = metrics.groupId("name", true);
+        assertNotNull(metrics.workerId(), id1.tags().get(ConnectMetrics.WORKER_ID_TAG_NAME));
+        assertEquals("name;worker-id=worker1", id1.toString());
+
+        id1 = metrics.groupId("name", true, "k1", "v1", "k2", "v2");
+        assertNotNull(metrics.workerId(), id1.tags().get(ConnectMetrics.WORKER_ID_TAG_NAME));
+        assertEquals("name;worker-id=worker1;k1=v1;k2=v2", id1.toString()); // maintain order of tags
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
new file mode 100644
index 0000000..0a61e10
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.util.MockTime;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class MockConnectMetrics extends ConnectMetrics {
+
+    private static final Map<String, String> DEFAULT_WORKER_CONFIG = new HashMap<>();
+    static {
+        DEFAULT_WORKER_CONFIG.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        DEFAULT_WORKER_CONFIG.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        DEFAULT_WORKER_CONFIG.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        DEFAULT_WORKER_CONFIG.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+    }
+
+    public MockConnectMetrics() {
+        super("mock", new WorkerConfig(WorkerConfig.baseConfigDef(), DEFAULT_WORKER_CONFIG), new MockTime());
+    }
+
+    @Override
+    public MockTime time() {
+        return (MockTime) super.time();
+    }
+}