You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2017/09/27 05:23:43 UTC
[2/2] kafka git commit: KAFKA-5900: Add task metrics common to both
sink and source tasks
KAFKA-5900: Add task metrics common to both sink and source tasks
Added metrics that are common to both sink and source tasks.
Marked as "**WIP**" since this PR is built upon #3864, and will need to be rebased once that has been merged into `trunk`. However, I would still appreciate initial reviews since this PR is largely additive.
Author: Randall Hauch <rh...@gmail.com>
Reviewers: Konstantine Karantasis <ko...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #3911 from rhauch/kafka-5900
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/73cc4166
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/73cc4166
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/73cc4166
Branch: refs/heads/trunk
Commit: 73cc416664dbc8e1442f70cb3c4cd8f4d365ea50
Parents: 8256f88
Author: Randall Hauch <rh...@gmail.com>
Authored: Tue Sep 26 22:23:37 2017 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Sep 26 22:23:37 2017 -0700
----------------------------------------------------------------------
.../kafka/common/metrics/stats/Frequencies.java | 192 ++++++++++++++++
.../kafka/common/metrics/stats/Frequency.java | 59 +++++
.../kafka/common/metrics/stats/Histogram.java | 115 +++++++---
.../kafka/common/metrics/stats/Percentiles.java | 1 +
.../common/metrics/stats/FrequenciesTest.java | 159 ++++++++++++++
.../common/metrics/stats/HistogramTest.java | 86 +++++++-
.../kafka/connect/runtime/ConnectMetrics.java | 202 +++++++++++++++--
.../kafka/connect/runtime/StateTracker.java | 173 +++++++++++++++
.../apache/kafka/connect/runtime/Worker.java | 4 +-
.../kafka/connect/runtime/WorkerConnector.java | 7 +-
.../kafka/connect/runtime/WorkerSinkTask.java | 13 +-
.../kafka/connect/runtime/WorkerSourceTask.java | 22 +-
.../kafka/connect/runtime/WorkerTask.java | 220 ++++++++++++++++++-
.../connect/runtime/ConnectMetricsTest.java | 52 +++++
.../connect/runtime/MockConnectMetrics.java | 42 ++++
.../connect/runtime/MockConnectorMetrics.java | 42 ----
.../kafka/connect/runtime/StateTrackerTest.java | 100 +++++++++
.../connect/runtime/WorkerConnectorTest.java | 2 +-
.../connect/runtime/WorkerSinkTaskTest.java | 47 +++-
.../runtime/WorkerSinkTaskThreadedTest.java | 12 +-
.../connect/runtime/WorkerSourceTaskTest.java | 12 +-
.../kafka/connect/runtime/WorkerTaskTest.java | 159 +++++++++++++-
.../kafka/connect/runtime/WorkerTest.java | 11 +
23 files changed, 1607 insertions(+), 125 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/clients/src/main/java/org/apache/kafka/common/metrics/stats/Frequencies.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Frequencies.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Frequencies.java
new file mode 100644
index 0000000..52178d3
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Frequencies.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.CompoundStat;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.stats.Histogram.BinScheme;
+import org.apache.kafka.common.metrics.stats.Histogram.ConstantBinScheme;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A {@link CompoundStat} that represents a normalized distribution with a {@link Frequency} metric for each
+ * bucketed value. The values of the {@link Frequency} metrics specify the frequency of the center value appearing
+ * relative to the total number of values recorded.
+ * <p>
+ * For example, consider a component that records failure or success of an operation using boolean values, with
+ * one metric to capture the percentage of operations that failed another to capture the percentage of operations
+ * that succeeded.
+ * <p>
+ * This can be accomplish by created a {@link org.apache.kafka.common.metrics.Sensor Sensor} to record the values,
+ * with 0.0 for false and 1.0 for true. Then, create a single {@link Frequencies} object that has two
+ * {@link Frequency} metrics: one centered around 0.0 and another centered around 1.0. The {@link Frequencies}
+ * object is a {@link CompoundStat}, and so it can be {@link org.apache.kafka.common.metrics.Sensor#add(CompoundStat)
+ * added directly to a Sensor} so the metrics are created automatically.
+ */
+public class Frequencies extends SampledStat implements CompoundStat {
+
+ /**
+ * Create a Frequencies instance with metrics for the frequency of a boolean sensor that records 0.0 for
+ * false and 1.0 for true.
+ *
+ * @param falseMetricName the name of the metric capturing the frequency of failures; may be null if not needed
+ * @param trueMetricName the name of the metric capturing the frequency of successes; may be null if not needed
+ * @return the Frequencies instance; never null
+ * @throws IllegalArgumentException if both {@code falseMetricName} and {@code trueMetricName} are null
+ */
+ public static Frequencies forBooleanValues(MetricName falseMetricName, MetricName trueMetricName) {
+ List<Frequency> frequencies = new ArrayList<>();
+ if (falseMetricName != null) {
+ frequencies.add(new Frequency(falseMetricName, 0.0));
+ }
+ if (trueMetricName != null) {
+ frequencies.add(new Frequency(trueMetricName, 1.0));
+ }
+ if (frequencies.isEmpty()) {
+ throw new IllegalArgumentException("Must specify at least one metric name");
+ }
+ Frequency[] frequencyArray = frequencies.toArray(new Frequency[frequencies.size()]);
+ return new Frequencies(2, 0.0, 1.0, frequencyArray);
+ }
+
+ private final Frequency[] frequencies;
+ private final BinScheme binScheme;
+
+ /**
+ * Create a Frequencies that captures the values in the specified range into the given number of buckets,
+ * where the buckets are centered around the minimum, maximum, and intermediate values.
+ *
+ * @param buckets the number of buckets
+ * @param min the minimum value to be captured
+ * @param max the maximum value to be captured
+ * @param frequencies the list of {@link Frequency} metrics, which at most should be one per bucket centered
+ * on the bucket's value, though not every bucket need to correspond to a metric if the
+ * value is not needed
+ * @throws IllegalArgumentException if any of the {@link Frequency} objects do not have a
+ * {@link Frequency#centerValue() center value} within the specified range
+ */
+ public Frequencies(int buckets, double min, double max, Frequency... frequencies) {
+ super(0.0); // initial value is unused by this implementation
+ if (max < min) {
+ throw new IllegalArgumentException("The maximum value " + max
+ + " must be greater than the minimum value " + min);
+ }
+ if (buckets < 1) {
+ throw new IllegalArgumentException("Must be at least 2 buckets");
+ }
+ if (buckets < frequencies.length) {
+ throw new IllegalArgumentException("More frequencies than buckets");
+ }
+ this.frequencies = frequencies;
+ for (Frequency freq : frequencies) {
+ if (min > freq.centerValue() || max < freq.centerValue()) {
+ throw new IllegalArgumentException("The frequency centered at '" + freq.centerValue()
+ + "' is not within the range [" + min + "," + max + "]");
+ }
+ }
+ double halfBucketWidth = (max - min) / (buckets - 1) / 2.0;
+ this.binScheme = new ConstantBinScheme(buckets, min - halfBucketWidth, max + halfBucketWidth);
+ }
+
+ @Override
+ public List<NamedMeasurable> stats() {
+ List<NamedMeasurable> ms = new ArrayList<>(frequencies.length);
+ for (Frequency frequency : frequencies) {
+ final double center = frequency.centerValue();
+ ms.add(new NamedMeasurable(frequency.name(), new Measurable() {
+ public double measure(MetricConfig config, long now) {
+ return frequency(config, now, center);
+ }
+ }));
+ }
+ return ms;
+ }
+
+ /**
+ * Return the computed frequency describing the number of occurrences of the values in the bucket for the given
+ * center point, relative to the total number of occurrences in the samples.
+ *
+ * @param config the metric configuration
+ * @param now the current time in milliseconds
+ * @param centerValue the value corresponding to the center point of the bucket
+ * @return the frequency of the values in the bucket relative to the total number of samples
+ */
+ public double frequency(MetricConfig config, long now, double centerValue) {
+ purgeObsoleteSamples(config, now);
+ long totalCount = 0;
+ for (Sample sample : samples) {
+ totalCount += sample.eventCount;
+ }
+ if (totalCount == 0) {
+ return 0.0d;
+ }
+ // Add up all of the counts in the bin corresponding to the center value
+ float count = 0.0f;
+ int binNum = binScheme.toBin(centerValue);
+ for (Sample s : samples) {
+ HistogramSample sample = (HistogramSample) s;
+ float[] hist = sample.histogram.counts();
+ count += hist[binNum];
+ }
+ // Compute the ratio of counts to total counts
+ return count / (double) totalCount;
+ }
+
+ double totalCount() {
+ long count = 0;
+ for (Sample sample : samples) {
+ count += sample.eventCount;
+ }
+ return count;
+ }
+
+ @Override
+ public double combine(List<Sample> samples, MetricConfig config, long now) {
+ return totalCount();
+ }
+
+ @Override
+ protected HistogramSample newSample(long timeMs) {
+ return new HistogramSample(binScheme, timeMs);
+ }
+
+ @Override
+ protected void update(Sample sample, MetricConfig config, double value, long timeMs) {
+ HistogramSample hist = (HistogramSample) sample;
+ hist.histogram.record(value);
+ }
+
+ private static class HistogramSample extends SampledStat.Sample {
+
+ private final Histogram histogram;
+
+ private HistogramSample(BinScheme scheme, long now) {
+ super(0.0, now);
+ histogram = new Histogram(scheme);
+ }
+
+ @Override
+ public void reset(long now) {
+ super.reset(now);
+ histogram.clear();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/clients/src/main/java/org/apache/kafka/common/metrics/stats/Frequency.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Frequency.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Frequency.java
new file mode 100644
index 0000000..116d0c2
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Frequency.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+
+import org.apache.kafka.common.MetricName;
+
+/**
+ * Definition of a frequency metric used in a {@link Frequencies} compound statistic.
+ */
+public class Frequency {
+
+ private final MetricName name;
+ private final double centerValue;
+
+ /**
+ * Create an instance with the given name and center point value.
+ *
+ * @param name the name of the frequency metric; may not be null
+ * @param centerValue the value identifying the {@link Frequencies} bucket to be reported
+ */
+ public Frequency(MetricName name, double centerValue) {
+ this.name = name;
+ this.centerValue = centerValue;
+ }
+
+ /**
+ * Get the name of this metric.
+ *
+ * @return the metric name; never null
+ */
+ public MetricName name() {
+ return this.name;
+ }
+
+ /**
+ * Get the value of this metrics center point.
+ *
+ * @return the center point value
+ */
+ public double centerValue() {
+ return this.centerValue;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/clients/src/main/java/org/apache/kafka/common/metrics/stats/Histogram.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Histogram.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Histogram.java
index 3b1426e..af2b064 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Histogram.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Histogram.java
@@ -30,12 +30,16 @@ public class Histogram {
public void record(double value) {
this.hist[binScheme.toBin(value)] += 1.0f;
- this.count += 1.0f;
+ this.count += 1.0d;
}
public double value(double quantile) {
if (count == 0.0d)
return Double.NaN;
+ if (quantile > 1.00d)
+ return Float.POSITIVE_INFINITY;
+ if (quantile < 0.00d)
+ return Float.NEGATIVE_INFINITY;
float sum = 0.0f;
float quant = (float) quantile;
for (int i = 0; i < this.hist.length - 1; i++) {
@@ -43,7 +47,7 @@ public class Histogram {
if (sum / count > quant)
return binScheme.fromBin(i);
}
- return Float.POSITIVE_INFINITY;
+ return binScheme.fromBin(this.hist.length - 1);
}
public float[] counts() {
@@ -67,32 +71,70 @@ public class Histogram {
}
b.append(Float.POSITIVE_INFINITY);
b.append(':');
- b.append(this.hist[this.hist.length - 1]);
+ b.append(String.format("%.0f", this.hist[this.hist.length - 1]));
b.append('}');
return b.toString();
}
+ /**
+ * An algorithm for determining the bin in which a value is to be placed as well as calculating the upper end
+ * of each bin.
+ */
public interface BinScheme {
- public int bins();
- public int toBin(double value);
-
- public double fromBin(int bin);
+ /**
+ * Get the number of bins.
+ *
+ * @return the number of bins
+ */
+ int bins();
+
+ /**
+ * Determine the 0-based bin number in which the supplied value should be placed.
+ *
+ * @param value the value
+ * @return the 0-based index of the bin
+ */
+ int toBin(double value);
+
+ /**
+ * Determine the value at the upper range of the specified bin.
+ *
+ * @param bin the 0-based bin number
+ * @return the value at the upper end of the bin; or {@link Float#NEGATIVE_INFINITY negative infinity}
+ * if the bin number is negative or {@link Float#POSITIVE_INFINITY positive infinity} if the 0-based
+ * bin number is greater than or equal to the {@link #bins() number of bins}.
+ */
+ double fromBin(int bin);
}
+ /**
+ * A scheme for calculating the bins where the width of each bin is a constant determined by the range of values
+ * and the number of bins.
+ */
public static class ConstantBinScheme implements BinScheme {
+ private static final int MIN_BIN_NUMBER = 0;
private final double min;
private final double max;
private final int bins;
private final double bucketWidth;
-
+ private final int maxBinNumber;
+
+ /**
+ * Create a bin scheme with the specified number of bins that all have the same width.
+ *
+ * @param bins the number of bins; must be at least 2
+ * @param min the minimum value to be counted in the bins
+ * @param max the maximum value to be counted in the bins
+ */
public ConstantBinScheme(int bins, double min, double max) {
if (bins < 2)
throw new IllegalArgumentException("Must have at least 2 bins.");
this.min = min;
this.max = max;
this.bins = bins;
- this.bucketWidth = (max - min) / (bins - 2);
+ this.bucketWidth = (max - min) / bins;
+ this.maxBinNumber = bins - 1;
}
public int bins() {
@@ -100,33 +142,49 @@ public class Histogram {
}
public double fromBin(int b) {
- if (b == 0)
- return Double.NEGATIVE_INFINITY;
- else if (b == bins - 1)
- return Double.POSITIVE_INFINITY;
- else
- return min + (b - 1) * bucketWidth;
+ if (b < MIN_BIN_NUMBER) {
+ return Float.NEGATIVE_INFINITY;
+ }
+ if (b > maxBinNumber) {
+ return Float.POSITIVE_INFINITY;
+ }
+ return min + b * bucketWidth;
}
public int toBin(double x) {
- if (x < min)
- return 0;
- else if (x > max)
- return bins - 1;
- else
- return (int) ((x - min) / bucketWidth) + 1;
+ int binNumber = (int) ((x - min) / bucketWidth);
+ if (binNumber < MIN_BIN_NUMBER) {
+ return MIN_BIN_NUMBER;
+ }
+ if (binNumber > maxBinNumber) {
+ return maxBinNumber;
+ }
+ return binNumber;
}
}
+ /**
+ * A scheme for calculating the bins where the width of each bin is one more than the previous bin, and therefore
+ * the bin widths are increasing at a linear rate. However, the bin widths are scaled such that the specified range
+ * of values will all fit within the bins (e.g., the upper range of the last bin is equal to the maximum value).
+ */
public static class LinearBinScheme implements BinScheme {
private final int bins;
private final double max;
private final double scale;
+ /**
+ * Create a linear bin scheme with the specified number of bins and the maximum value to be counted in the bins.
+ *
+ * @param numBins the number of bins; must be at least 2
+ * @param max the maximum value to be counted in the bins
+ */
public LinearBinScheme(int numBins, double max) {
+ if (numBins < 2)
+ throw new IllegalArgumentException("Must have at least 2 bins.");
this.bins = numBins;
this.max = max;
- int denom = numBins * (numBins - 1) / 2;
+ double denom = numBins * (numBins - 1.0) / 2.0;
this.scale = max / denom;
}
@@ -135,11 +193,12 @@ public class Histogram {
}
public double fromBin(int b) {
- if (b == this.bins - 1) {
+ if (b > this.bins - 1) {
return Float.POSITIVE_INFINITY;
+ } else if (b < 0.0000d) {
+ return Float.NEGATIVE_INFINITY;
} else {
- double unscaled = (b * (b + 1.0)) / 2.0;
- return unscaled * this.scale;
+ return this.scale * (b * (b + 1.0)) / 2.0;
}
}
@@ -149,10 +208,8 @@ public class Histogram {
} else if (x > this.max) {
return this.bins - 1;
} else {
- double scaled = x / this.scale;
- return (int) (-0.5 + Math.sqrt(2.0 * scaled + 0.25));
+ return (int) (-0.5 + 0.5 * Math.sqrt(1.0 + 8.0 * x / this.scale));
}
}
}
-
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
index 80236e9..e970f48 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
@@ -93,6 +93,7 @@ public class Percentiles extends SampledStat implements CompoundStat {
return Double.POSITIVE_INFINITY;
}
+ @Override
public double combine(List<Sample> samples, MetricConfig config, long now) {
return value(config, now, 0.5);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
new file mode 100644
index 0000000..9db18e2
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.CompoundStat.NamedMeasurable;
+import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+
+public class FrequenciesTest {
+
+ private static final double DELTA = 0.0001d;
+ private MetricConfig config;
+ private Time time;
+ private Metrics metrics;
+
+ @Before
+ public void setup() {
+ config = new MetricConfig().eventWindow(50).samples(2);
+ time = new MockTime();
+ metrics = new Metrics(config, Arrays.asList((MetricsReporter) new JmxReporter()), time, true);
+ }
+
+ @After
+ public void tearDown() {
+ metrics.close();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testFrequencyCenterValueAboveMax() {
+ new Frequencies(4, 1.0, 4.0,
+ freq("1", 1.0), freq("2", 20.0));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testFrequencyCenterValueBelowMin() {
+ new Frequencies(4, 1.0, 4.0,
+ freq("1", 1.0), freq("2", -20.0));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testMoreFrequencyParametersThanBuckets() {
+ new Frequencies(1, 1.0, 4.0,
+ freq("1", 1.0), freq("2", -20.0));
+ }
+
+ @Test
+ public void testBooleanFrequencies() {
+ MetricName metricTrue = name("true");
+ MetricName metricFalse = name("false");
+ Frequencies frequencies = Frequencies.forBooleanValues(metricFalse, metricTrue);
+ final NamedMeasurable falseMetric = frequencies.stats().get(0);
+ final NamedMeasurable trueMetric = frequencies.stats().get(1);
+
+ // Record 2 windows worth of values
+ for (int i = 0; i != 25; ++i) {
+ frequencies.record(config, 0.0, time.milliseconds());
+ }
+ for (int i = 0; i != 75; ++i) {
+ frequencies.record(config, 1.0, time.milliseconds());
+ }
+ assertEquals(0.25, falseMetric.stat().measure(config, time.milliseconds()), DELTA);
+ assertEquals(0.75, trueMetric.stat().measure(config, time.milliseconds()), DELTA);
+
+ // Record 2 more windows worth of values
+ for (int i = 0; i != 40; ++i) {
+ frequencies.record(config, 0.0, time.milliseconds());
+ }
+ for (int i = 0; i != 60; ++i) {
+ frequencies.record(config, 1.0, time.milliseconds());
+ }
+ assertEquals(0.40, falseMetric.stat().measure(config, time.milliseconds()), DELTA);
+ assertEquals(0.60, trueMetric.stat().measure(config, time.milliseconds()), DELTA);
+ }
+
+ @Test
+ public void testUseWithMetrics() {
+ MetricName name1 = name("1");
+ MetricName name2 = name("2");
+ MetricName name3 = name("3");
+ MetricName name4 = name("4");
+ Frequencies frequencies = new Frequencies(4, 1.0, 4.0,
+ new Frequency(name1, 1.0),
+ new Frequency(name2, 2.0),
+ new Frequency(name3, 3.0),
+ new Frequency(name4, 4.0));
+ Sensor sensor = metrics.sensor("test", config);
+ sensor.add(frequencies);
+ Metric metric1 = this.metrics.metrics().get(name1);
+ Metric metric2 = this.metrics.metrics().get(name2);
+ Metric metric3 = this.metrics.metrics().get(name3);
+ Metric metric4 = this.metrics.metrics().get(name4);
+
+ // Record 2 windows worth of values
+ for (int i = 0; i != 100; ++i) {
+ frequencies.record(config, i % 4 + 1, time.milliseconds());
+ }
+ assertEquals(0.25, metric1.value(), DELTA);
+ assertEquals(0.25, metric2.value(), DELTA);
+ assertEquals(0.25, metric3.value(), DELTA);
+ assertEquals(0.25, metric4.value(), DELTA);
+
+ // Record 2 windows worth of values
+ for (int i = 0; i != 100; ++i) {
+ frequencies.record(config, i % 2 + 1, time.milliseconds());
+ }
+ assertEquals(0.50, metric1.value(), DELTA);
+ assertEquals(0.50, metric2.value(), DELTA);
+ assertEquals(0.00, metric3.value(), DELTA);
+ assertEquals(0.00, metric4.value(), DELTA);
+
+ // Record 1 window worth of values to overlap with the last window
+ // that is half 1.0 and half 2.0
+ for (int i = 0; i != 50; ++i) {
+ frequencies.record(config, 4.0, time.milliseconds());
+ }
+ assertEquals(0.25, metric1.value(), DELTA);
+ assertEquals(0.25, metric2.value(), DELTA);
+ assertEquals(0.00, metric3.value(), DELTA);
+ assertEquals(0.50, metric4.value(), DELTA);
+ }
+
+ protected MetricName name(String metricName) {
+ return new MetricName(metricName, "group-id", "desc", Collections.<String, String>emptyMap());
+ }
+
+ protected Frequency freq(String name, double value) {
+ return new Frequency(name(name), value);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/clients/src/test/java/org/apache/kafka/common/metrics/stats/HistogramTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/stats/HistogramTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/stats/HistogramTest.java
index aba5e7a..473a33e 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/stats/HistogramTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/stats/HistogramTest.java
@@ -33,12 +33,12 @@ public class HistogramTest {
@Test
public void testHistogram() {
- BinScheme scheme = new ConstantBinScheme(12, -5, 5);
+ BinScheme scheme = new ConstantBinScheme(10, -5, 5);
Histogram hist = new Histogram(scheme);
for (int i = -5; i < 5; i++)
hist.record(i);
for (int i = 0; i < 10; i++)
- assertEquals(scheme.fromBin(i + 1), hist.value(i / 10.0 + EPS), EPS);
+ assertEquals(scheme.fromBin(i), hist.value(i / 10.0 + EPS), EPS);
}
@Test
@@ -46,16 +46,90 @@ public class HistogramTest {
ConstantBinScheme scheme = new ConstantBinScheme(5, -5, 5);
assertEquals("A value below the lower bound should map to the first bin", 0, scheme.toBin(-5.01));
assertEquals("A value above the upper bound should map to the last bin", 4, scheme.toBin(5.01));
- assertEquals("Check boundary of bucket 1", 1, scheme.toBin(-5));
- assertEquals("Check boundary of bucket 4", 4, scheme.toBin(5));
- assertEquals("Check boundary of bucket 3", 3, scheme.toBin(4.9999));
- checkBinningConsistency(new ConstantBinScheme(4, 0, 5));
+ assertEquals("Check boundary of bucket 0", 0, scheme.toBin(-5.0001));
+ assertEquals("Check boundary of bucket 0", 0, scheme.toBin(-5.0000));
+ assertEquals("Check boundary of bucket 0", 0, scheme.toBin(-4.99999));
+ assertEquals("Check boundary of bucket 0", 0, scheme.toBin(-3.00001));
+ assertEquals("Check boundary of bucket 1", 1, scheme.toBin(-3));
+ assertEquals("Check boundary of bucket 1", 1, scheme.toBin(-1.00001));
+ assertEquals("Check boundary of bucket 2", 2, scheme.toBin(-1));
+ assertEquals("Check boundary of bucket 2", 2, scheme.toBin(0.99999));
+ assertEquals("Check boundary of bucket 3", 3, scheme.toBin(1));
+ assertEquals("Check boundary of bucket 3", 3, scheme.toBin(2.99999));
+ assertEquals("Check boundary of bucket 4", 4, scheme.toBin(3));
+ assertEquals("Check boundary of bucket 4", 4, scheme.toBin(4.9999));
+ assertEquals("Check boundary of bucket 4", 4, scheme.toBin(5.000));
+ assertEquals("Check boundary of bucket 4", 4, scheme.toBin(5.001));
+ assertEquals(Float.NEGATIVE_INFINITY, scheme.fromBin(-1), 0.001d);
+ assertEquals(Float.POSITIVE_INFINITY, scheme.fromBin(5), 0.001d);
+ assertEquals(-5.0, scheme.fromBin(0), 0.001d);
+ assertEquals(-3.0, scheme.fromBin(1), 0.001d);
+ assertEquals(-1.0, scheme.fromBin(2), 0.001d);
+ assertEquals(1.0, scheme.fromBin(3), 0.001d);
+ assertEquals(3.0, scheme.fromBin(4), 0.001d);
+ checkBinningConsistency(scheme);
+ }
+
+ @Test
+ public void testConstantBinSchemeWithPositiveRange() {
+ ConstantBinScheme scheme = new ConstantBinScheme(5, 0, 5);
+ assertEquals("A value below the lower bound should map to the first bin", 0, scheme.toBin(-1.0));
+ assertEquals("A value above the upper bound should map to the last bin", 4, scheme.toBin(5.01));
+ assertEquals("Check boundary of bucket 0", 0, scheme.toBin(-0.0001));
+ assertEquals("Check boundary of bucket 0", 0, scheme.toBin(0.0000));
+ assertEquals("Check boundary of bucket 0", 0, scheme.toBin(0.0001));
+ assertEquals("Check boundary of bucket 0", 0, scheme.toBin(0.9999));
+ assertEquals("Check boundary of bucket 1", 1, scheme.toBin(1.0000));
+ assertEquals("Check boundary of bucket 1", 1, scheme.toBin(1.0001));
+ assertEquals("Check boundary of bucket 1", 1, scheme.toBin(1.9999));
+ assertEquals("Check boundary of bucket 2", 2, scheme.toBin(2.0000));
+ assertEquals("Check boundary of bucket 2", 2, scheme.toBin(2.0001));
+ assertEquals("Check boundary of bucket 2", 2, scheme.toBin(2.9999));
+ assertEquals("Check boundary of bucket 3", 3, scheme.toBin(3.0000));
+ assertEquals("Check boundary of bucket 3", 3, scheme.toBin(3.0001));
+ assertEquals("Check boundary of bucket 3", 3, scheme.toBin(3.9999));
+ assertEquals("Check boundary of bucket 4", 4, scheme.toBin(4.0000));
+ assertEquals("Check boundary of bucket 4", 4, scheme.toBin(4.9999));
+ assertEquals("Check boundary of bucket 4", 4, scheme.toBin(5.0000));
+ assertEquals("Check boundary of bucket 4", 4, scheme.toBin(5.0001));
+ assertEquals(Float.NEGATIVE_INFINITY, scheme.fromBin(-1), 0.001d);
+ assertEquals(Float.POSITIVE_INFINITY, scheme.fromBin(5), 0.001d);
+ assertEquals(0.0, scheme.fromBin(0), 0.001d);
+ assertEquals(1.0, scheme.fromBin(1), 0.001d);
+ assertEquals(2.0, scheme.fromBin(2), 0.001d);
+ assertEquals(3.0, scheme.fromBin(3), 0.001d);
+ assertEquals(4.0, scheme.fromBin(4), 0.001d);
checkBinningConsistency(scheme);
}
@Test
public void testLinearBinScheme() {
LinearBinScheme scheme = new LinearBinScheme(10, 10);
+ assertEquals(Float.NEGATIVE_INFINITY, scheme.fromBin(-1), 0.001d);
+ assertEquals(Float.POSITIVE_INFINITY, scheme.fromBin(11), 0.001d);
+ assertEquals(0.0, scheme.fromBin(0), 0.001d);
+ assertEquals(0.2222, scheme.fromBin(1), 0.001d);
+ assertEquals(0.6666, scheme.fromBin(2), 0.001d);
+ assertEquals(1.3333, scheme.fromBin(3), 0.001d);
+ assertEquals(2.2222, scheme.fromBin(4), 0.001d);
+ assertEquals(3.3333, scheme.fromBin(5), 0.001d);
+ assertEquals(4.6667, scheme.fromBin(6), 0.001d);
+ assertEquals(6.2222, scheme.fromBin(7), 0.001d);
+ assertEquals(8.0000, scheme.fromBin(8), 0.001d);
+ assertEquals(10.000, scheme.fromBin(9), 0.001d);
+ assertEquals(0, scheme.toBin(0.0000));
+ assertEquals(0, scheme.toBin(0.2221));
+ assertEquals(1, scheme.toBin(0.2223));
+ assertEquals(2, scheme.toBin(0.6667));
+ assertEquals(3, scheme.toBin(1.3334));
+ assertEquals(4, scheme.toBin(2.2223));
+ assertEquals(5, scheme.toBin(3.3334));
+ assertEquals(6, scheme.toBin(4.6667));
+ assertEquals(7, scheme.toBin(6.2223));
+ assertEquals(8, scheme.toBin(8.0000));
+ assertEquals(9, scheme.toBin(10.000));
+ assertEquals(9, scheme.toBin(10.001));
+ assertEquals(Float.POSITIVE_INFINITY, scheme.fromBin(10), 0.001d);
checkBinningConsistency(scheme);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
index 681a398..8c8ba6d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
@@ -30,10 +30,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
-import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@@ -51,7 +53,7 @@ public class ConnectMetrics {
private final Metrics metrics;
private final Time time;
private final String workerId;
- private final ConcurrentMap<String, MetricGroup> groupsByName = new ConcurrentHashMap<>();
+ private final ConcurrentMap<MetricGroupId, MetricGroup> groupsByName = new ConcurrentHashMap<>();
/**
* Create an instance.
@@ -107,6 +109,7 @@ public class ConnectMetrics {
/**
* Get or create a {@link MetricGroup} with the specified group name and the given tags.
+ * Each group is uniquely identified by the name and tags.
*
* @param groupName the name of the metric group; may not be null and must be a
* {@link #checkNameIsValid(String) valid name}
@@ -120,6 +123,7 @@ public class ConnectMetrics {
/**
* Get or create a {@link MetricGroup} with the specified group name and the given tags.
+ * Each group is uniquely identified by the name and tags.
*
* @param groupName the name of the metric group; may not be null and must be a
* {@link #checkNameIsValid(String) valid name}
@@ -129,16 +133,22 @@ public class ConnectMetrics {
* @throws IllegalArgumentException if the group name is not valid
*/
public MetricGroup group(String groupName, boolean includeWorkerId, String... tagKeyValues) {
- MetricGroup group = groupsByName.get(groupName);
+ MetricGroupId groupId = groupId(groupName, includeWorkerId, tagKeyValues);
+ MetricGroup group = groupsByName.get(groupId);
if (group == null) {
- Map<String, String> tags = tags(includeWorkerId ? workerId : null, tagKeyValues);
- group = new MetricGroup(groupName, tags);
- MetricGroup previous = groupsByName.putIfAbsent(groupName, group);
+ group = new MetricGroup(groupId);
+ MetricGroup previous = groupsByName.putIfAbsent(groupId, group);
if (previous != null) group = previous;
}
return group;
}
+ protected MetricGroupId groupId(String groupName, boolean includeWorkerId, String... tagKeyValues) {
+ checkNameIsValid(groupName);
+ Map<String, String> tags = tags(includeWorkerId ? workerId : null, tagKeyValues);
+ return new MetricGroupId(groupName, tags);
+ }
+
/**
* Get the time.
*
@@ -157,24 +167,94 @@ public class ConnectMetrics {
AppInfoParser.unregisterAppInfo(JMX_PREFIX, workerId);
}
+ public static class MetricGroupId {
+ private final String groupName;
+ private final Map<String, String> tags;
+ private final int hc;
+ private final String str;
+
+ public MetricGroupId(String groupName, Map<String, String> tags) {
+ assert groupName != null;
+ assert tags != null;
+ this.groupName = groupName;
+ this.tags = Collections.unmodifiableMap(new LinkedHashMap<>(tags));
+ this.hc = Objects.hash(this.groupName, this.tags);
+ StringBuilder sb = new StringBuilder(this.groupName);
+ for (Map.Entry<String, String> entry : this.tags.entrySet()) {
+ sb.append(";").append(entry.getKey()).append('=').append(entry.getValue());
+ }
+ this.str = sb.toString();
+ }
+
+ /**
+ * Get the group name.
+ *
+ * @return the group name; never null
+ */
+ public String groupName() {
+ return groupName;
+ }
+
+ /**
+ * Get the immutable map of tag names and values.
+ *
+ * @return the tags; never null
+ */
+ public Map<String, String> tags() {
+ return tags;
+ }
+
+ /**
+ * Determine if the supplied metric name is part of this group identifier.
+ *
+ * @param metricName the metric name
+ * @return true if the metric name's group and tags match this group identifier, or false otherwise
+ */
+ public boolean includes(MetricName metricName) {
+ return metricName != null && groupName.equals(metricName.group()) && tags.equals(metricName.tags());
+ }
+
+ @Override
+ public int hashCode() {
+ return hc;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) return true;
+ if (obj instanceof MetricGroupId) {
+ MetricGroupId that = (MetricGroupId) obj;
+ return this.groupName.equals(that.groupName) && this.tags.equals(that.tags);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return str;
+ }
+ }
+
/**
* A group of metrics. Each group maps to a JMX MBean and each metric maps to an MBean attribute.
+ * <p>
+ * Sensors should be added via the {@code sensor} methods on this class, rather than directly through
+ * the {@link Metrics} class, so that the sensor names are made to be unique (based on the group name)
+ * and so the sensors are removed when this group is {@link #close() closed}.
*/
public class MetricGroup {
- private final String groupName;
- private final Map<String, String> tags;
+ private final MetricGroupId groupId;
+ private final Set<String> sensorNames = new HashSet<>();
+ private final String sensorPrefix;
/**
* Create a group of Connect metrics.
*
- * @param groupName the name of the group; may not be null and must be valid
- * @param tags the tags; may not be null but may be empty
- * @throws IllegalArgumentException if the name is not valid
+ * @param groupId the identifier of the group; may not be null and must be valid
*/
- protected MetricGroup(String groupName, Map<String, String> tags) {
- checkNameIsValid(groupName);
- this.groupName = groupName;
- this.tags = Collections.unmodifiableMap(new HashMap<>(tags));
+ protected MetricGroup(MetricGroupId groupId) {
+ this.groupId = groupId;
+ sensorPrefix = "connect-sensor-group: " + groupId.toString() + ";";
}
/**
@@ -187,11 +267,15 @@ public class ConnectMetrics {
*/
public MetricName metricName(String name, String desc) {
checkNameIsValid(name);
- return metrics.metricName(name, groupName, desc, tags);
+ return metrics.metricName(name, groupId.groupName(), desc, groupId.tags());
}
/**
* The {@link Metrics} that this group belongs to.
+ * <p>
+ * Do not use this to add {@link Sensor Sensors}, since they will not be removed when this group is
+ * {@link #close() closed}. Metrics can be added directly, as long as the metric names are obtained from
+ * this group via the {@link #metricName(String, String)} method.
*
* @return the metrics; never null
*/
@@ -205,7 +289,7 @@ public class ConnectMetrics {
* @return the unmodifiable tags; never null but may be empty
*/
Map<String, String> tags() {
- return tags;
+ return groupId.tags();
}
/**
@@ -228,6 +312,86 @@ public class ConnectMetrics {
});
}
}
+
+ /**
+ * Get or create a sensor with the given unique name and no parent sensors. This uses
+ * a default recording level of INFO.
+ *
+ * @param name The sensor name
+ * @return The sensor
+ */
+ public Sensor sensor(String name) {
+ return sensor(name, null, Sensor.RecordingLevel.INFO);
+ }
+
+ /**
+ * Get or create a sensor with the given unique name and no parent sensors. This uses
+ * a default recording level of INFO.
+ *
+ * @param name The sensor name
+ * @return The sensor
+ */
+ public Sensor sensor(String name, Sensor... parents) {
+ return sensor(name, null, Sensor.RecordingLevel.INFO, parents);
+ }
+
+ /**
+ * Get or create a sensor with the given unique name and zero or more parent sensors. All parent sensors will
+ * receive every value recorded with this sensor.
+ *
+ * @param name The name of the sensor
+ * @param recordingLevel The recording level.
+ * @param parents The parent sensors
+ * @return The sensor that is created
+ */
+ public Sensor sensor(String name, Sensor.RecordingLevel recordingLevel, Sensor... parents) {
+ return sensor(name, null, recordingLevel, parents);
+ }
+
+ /**
+ * Get or create a sensor with the given unique name and zero or more parent sensors. All parent sensors will
+ * receive every value recorded with this sensor.
+ *
+ * @param name The name of the sensor
+ * @param config A default configuration to use for this sensor for metrics that don't have their own config
+ * @param parents The parent sensors
+ * @return The sensor that is created
+ */
+ public Sensor sensor(String name, MetricConfig config, Sensor... parents) {
+ return sensor(name, config, Sensor.RecordingLevel.INFO, parents);
+ }
+
+ /**
+ * Get or create a sensor with the given unique name and zero or more parent sensors. All parent sensors will
+ * receive every value recorded with this sensor.
+ *
+ * @param name The name of the sensor
+ * @param config A default configuration to use for this sensor for metrics that don't have their own config
+ * @param recordingLevel The recording level.
+ * @param parents The parent sensors
+ * @return The sensor that is created
+ */
+ public synchronized Sensor sensor(String name, MetricConfig config, Sensor.RecordingLevel recordingLevel, Sensor... parents) {
+ // We need to make sure that all sensor names are unique across all groups, so use the sensor prefix
+ Sensor result = metrics.sensor(sensorPrefix + name, config, Long.MAX_VALUE, recordingLevel, parents);
+ if (result != null) sensorNames.add(result.name());
+ return result;
+ }
+
+ /**
+ * Remove all sensors and metrics associated with this group.
+ */
+ public synchronized void close() {
+ for (String sensorName : sensorNames) {
+ metrics.removeSensor(sensorName);
+ }
+ sensorNames.clear();
+ for (MetricName metricName : new HashSet<>(metrics.metrics().keySet())) {
+ if (groupId.includes(metricName)) {
+ metrics.removeMetric(metricName);
+ }
+ }
+ }
}
/**
@@ -245,7 +409,7 @@ public class ConnectMetrics {
/**
* Create a set of tags using the supplied key and value pairs. Every tag name and value will be
- * {@link #makeValidName(String) made valid} before it is used.
+ * {@link #makeValidName(String) made valid} before it is used. The order of the tags will be kept.
*
* @param workerId the worker ID that should be included first in the tags; may be null if not to be included
* @param keyValue the key and value pairs for the tags; must be an even number
@@ -254,7 +418,7 @@ public class ConnectMetrics {
static Map<String, String> tags(String workerId, String... keyValue) {
if ((keyValue.length % 2) != 0)
throw new IllegalArgumentException("keyValue needs to be specified in pairs");
- Map<String, String> tags = new HashMap<>();
+ Map<String, String> tags = new LinkedHashMap<>();
if (workerId != null && !workerId.trim().isEmpty()) {
tags.put(WORKER_ID_TAG_NAME, makeValidName(workerId));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java
new file mode 100644
index 0000000..566a6fc
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.runtime.AbstractStatus.State;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Utility class that tracks the current state and the duration of time spent in each state.
+ * This class is threadsafe.
+ */
+public class StateTracker {
+
+ private final AtomicReference<StateChange> lastState = new AtomicReference<>(new StateChange());
+
+ /**
+ * Change the current state.
+ * <p>
+ * This method is synchronized to ensure that all state changes are captured correctly and in the same order.
+ * Synchronization is acceptable since it is assumed that state changes will be relatively infrequent.
+ *
+ * @param newState the current state; may not be null
+ * @param now the current time in milliseconds
+ */
+ public synchronized void changeState(State newState, long now) {
+ // JDK8: remove synchronization by using lastState.getAndUpdate(oldState->oldState.newState(newState, now));
+ lastState.set(lastState.get().newState(newState, now));
+ }
+
+ /**
+ * Calculate the ratio of time spent in the specified state.
+ *
+ * @param ratioState the state for which the ratio is to be calculated; may not be null
+ * @param now the current time in milliseconds
+ * @return the ratio of time spent in the specified state to the time spent in all states
+ */
+ public double durationRatio(State ratioState, long now) {
+ return lastState.get().durationRatio(ratioState, now);
+ }
+
+ /**
+ * Get the current state.
+ *
+ * @return the current state; may be null if no state change has been recorded
+ */
+ public State currentState() {
+ return lastState.get().state;
+ }
+
+ /**
+ * An immutable record of the accumulated times at the most recent state change. This class is required to
+ * efficiently make {@link StateTracker} threadsafe.
+ */
+ private static final class StateChange {
+
+ private final State state;
+ private final long startTime;
+ private final long unassignedTotalTimeMs;
+ private final long runningTotalTimeMs;
+ private final long pausedTotalTimeMs;
+ private final long failedTotalTimeMs;
+ private final long destroyedTotalTimeMs;
+
+ /**
+ * The initial StateChange instance before any state has changed.
+ */
+ StateChange() {
+ this(null, 0L, 0L, 0L, 0L, 0L, 0L);
+ }
+
+ StateChange(State state, long startTime, long unassignedTotalTimeMs, long runningTotalTimeMs,
+ long pausedTotalTimeMs, long failedTotalTimeMs, long destroyedTotalTimeMs) {
+ this.state = state;
+ this.startTime = startTime;
+ this.unassignedTotalTimeMs = unassignedTotalTimeMs;
+ this.runningTotalTimeMs = runningTotalTimeMs;
+ this.pausedTotalTimeMs = pausedTotalTimeMs;
+ this.failedTotalTimeMs = failedTotalTimeMs;
+ this.destroyedTotalTimeMs = destroyedTotalTimeMs;
+ }
+
+ /**
+ * Return a new StateChange that includes the accumulated times of this state plus the time spent in the
+ * current state.
+ *
+ * @param state the new state; may not be null
+ * @param now the time at which the state transition occurs.
+ * @return the new StateChange, though may be this instance of the state did not actually change; never null
+ */
+ public StateChange newState(State state, long now) {
+ if (this.state == null) {
+ return new StateChange(state, now, 0L, 0L, 0L, 0L, 0L);
+ }
+ if (state == this.state) {
+ return this;
+ }
+ long unassignedTime = this.unassignedTotalTimeMs;
+ long runningTime = this.runningTotalTimeMs;
+ long pausedTime = this.pausedTotalTimeMs;
+ long failedTime = this.failedTotalTimeMs;
+ long destroyedTime = this.destroyedTotalTimeMs;
+ long duration = now - startTime;
+ switch (this.state) {
+ case UNASSIGNED:
+ unassignedTime += duration;
+ break;
+ case RUNNING:
+ runningTime += duration;
+ break;
+ case PAUSED:
+ pausedTime += duration;
+ break;
+ case FAILED:
+ failedTime += duration;
+ break;
+ case DESTROYED:
+ destroyedTime += duration;
+ break;
+ }
+ return new StateChange(state, now, unassignedTime, runningTime, pausedTime, failedTime, destroyedTime);
+ }
+
+ /**
+ * Calculate the ratio of time spent in the specified state.
+ *
+ * @param ratioState the state for which the ratio is to be calculated; may not be null
+ * @param now the current time in milliseconds
+ * @return the ratio of time spent in the specified state to the time spent in all states
+ */
+ public double durationRatio(State ratioState, long now) {
+ if (state == null) {
+ return 0.0d;
+ }
+ long durationCurrent = now - startTime; // since last state change
+ long durationDesired = ratioState == state ? durationCurrent : 0L;
+ switch (ratioState) {
+ case UNASSIGNED:
+ durationDesired += unassignedTotalTimeMs;
+ break;
+ case RUNNING:
+ durationDesired += runningTotalTimeMs;
+ break;
+ case PAUSED:
+ durationDesired += pausedTotalTimeMs;
+ break;
+ case FAILED:
+ durationDesired += failedTotalTimeMs;
+ break;
+ case DESTROYED:
+ durationDesired += destroyedTotalTimeMs;
+ break;
+ }
+ long total = durationCurrent + unassignedTotalTimeMs + runningTotalTimeMs + pausedTotalTimeMs +
+ failedTotalTimeMs + destroyedTotalTimeMs;
+ return (double) durationDesired / total;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 7ce9214..56bc341 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -428,10 +428,10 @@ public class Worker {
internalKeyConverter, internalValueConverter);
KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter,
- valueConverter, transformationChain, producer, offsetReader, offsetWriter, config, loader, time);
+ valueConverter, transformationChain, producer, offsetReader, offsetWriter, config, metrics, loader, time);
} else if (task instanceof SinkTask) {
TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connConfig.<SinkRecord>transformations());
- return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, keyConverter,
+ return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, metrics, keyConverter,
valueConverter, transformationChain, loader, time);
} else {
log.error("Tasks must be a subclass of either SourceTask or SinkTask", task);
http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
index aa9cdd1..49dbf4d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
-import org.apache.kafka.connect.runtime.AbstractStatus.State;
import org.apache.kafka.connect.runtime.ConnectMetrics.IndicatorPredicate;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.sink.SinkConnector;
@@ -171,6 +170,8 @@ public class WorkerConnector {
log.error("{} Error while shutting down connector", this, t);
this.state = State.FAILED;
statusListener.onFailure(connName, t);
+ } finally {
+ metrics.close();
}
}
@@ -244,6 +245,10 @@ public class WorkerConnector {
});
}
+ public void close() {
+ metricGroup.close();
+ }
+
@Override
public void onStartup(String connector) {
state = AbstractStatus.State.RUNNING;
http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 5f2b39f..8f0f7fd 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -81,12 +81,13 @@ class WorkerSinkTask extends WorkerTask {
TaskStatus.Listener statusListener,
TargetState initialState,
WorkerConfig workerConfig,
+ ConnectMetrics connectMetrics,
Converter keyConverter,
Converter valueConverter,
TransformationChain<SinkRecord> transformationChain,
ClassLoader loader,
Time time) {
- super(id, statusListener, initialState, loader);
+ super(id, statusListener, initialState, loader, connectMetrics);
this.workerConfig = workerConfig;
this.task = task;
@@ -137,6 +138,10 @@ class WorkerSinkTask extends WorkerTask {
}
@Override
+ protected void releaseResources() {
+ }
+
+ @Override
public void transitionTo(TargetState state) {
super.transitionTo(state);
consumer.wakeup();
@@ -211,18 +216,21 @@ class WorkerSinkTask extends WorkerTask {
log.debug("{} Received out of order commit callback for sequence number {}, but most recent sequence number is {}",
this, seqno, commitSeqno);
} else {
+ long durationMillis = time.milliseconds() - commitStarted;
if (error != null) {
log.error("{} Commit of offsets threw an unexpected exception for sequence number {}: {}",
this, seqno, committedOffsets, error);
commitFailures++;
+ recordCommitFailure(durationMillis, error);
} else {
log.debug("{} Finished offset commit successfully in {} ms for sequence number {}: {}",
- this, time.milliseconds() - commitStarted, seqno, committedOffsets);
+ this, durationMillis, seqno, committedOffsets);
if (committedOffsets != null) {
log.debug("{} Setting last committed offsets to {}", this, committedOffsets);
lastCommittedOffsets = committedOffsets;
}
commitFailures = 0;
+ recordCommitSuccess(durationMillis);
}
committing = false;
}
@@ -466,6 +474,7 @@ class WorkerSinkTask extends WorkerTask {
// Since we reuse the messageBatch buffer, ensure we give the task its own copy
log.trace("{} Delivering batch of {} messages to task", this, messageBatch.size());
task.put(new ArrayList<>(messageBatch));
+ recordBatch(messageBatch.size());
currentOffsets.putAll(origOffsets);
messageBatch.clear();
// If we had paused all consumer topic partitions to try to redeliver data, then we should resume any that
http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 6a17b71..92f7789 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -86,9 +86,10 @@ class WorkerSourceTask extends WorkerTask {
OffsetStorageReader offsetReader,
OffsetStorageWriter offsetWriter,
WorkerConfig workerConfig,
+ ConnectMetrics connectMetrics,
ClassLoader loader,
Time time) {
- super(id, statusListener, initialState, loader);
+ super(id, statusListener, initialState, loader, connectMetrics);
this.workerConfig = workerConfig;
this.task = task;
@@ -124,6 +125,10 @@ class WorkerSourceTask extends WorkerTask {
}
@Override
+ protected void releaseResources() {
+ }
+
+ @Override
public void stop() {
super.stop();
stopRequestedLatch.countDown();
@@ -186,6 +191,7 @@ class WorkerSourceTask extends WorkerTask {
*/
private boolean sendRecords() {
int processed = 0;
+ recordBatch(toSend.size());
for (final SourceRecord preTransformRecord : toSend) {
final SourceRecord record = transformationChain.apply(preTransformRecord);
@@ -303,6 +309,7 @@ class WorkerSourceTask extends WorkerTask {
if (timeoutMs <= 0) {
log.error("{} Failed to flush, timed out while waiting for producer to flush outstanding {} messages", this, outstandingMessages.size());
finishFailedFlush();
+ recordCommitFailure(time.milliseconds() - started, null);
return false;
}
this.wait(timeoutMs);
@@ -312,6 +319,7 @@ class WorkerSourceTask extends WorkerTask {
// to stop immediately
log.error("{} Interrupted while flushing messages, offsets will not be committed", this);
finishFailedFlush();
+ recordCommitFailure(time.milliseconds() - started, null);
return false;
}
}
@@ -322,8 +330,10 @@ class WorkerSourceTask extends WorkerTask {
// flush time, which can be used for monitoring even if the connector doesn't record any
// offsets.
finishSuccessfulFlush();
+ long durationMillis = time.milliseconds() - started;
+ recordCommitSuccess(durationMillis);
log.debug("{} Finished offset commitOffsets successfully in {} ms",
- this, time.milliseconds() - started);
+ this, durationMillis);
commitSourceTask();
return true;
@@ -345,6 +355,7 @@ class WorkerSourceTask extends WorkerTask {
// any data
if (flushFuture == null) {
finishFailedFlush();
+ recordCommitFailure(time.milliseconds() - started, null);
return false;
}
try {
@@ -356,20 +367,25 @@ class WorkerSourceTask extends WorkerTask {
} catch (InterruptedException e) {
log.warn("{} Flush of offsets interrupted, cancelling", this);
finishFailedFlush();
+ recordCommitFailure(time.milliseconds() - started, e);
return false;
} catch (ExecutionException e) {
log.error("{} Flush of offsets threw an unexpected exception: ", this, e);
finishFailedFlush();
+ recordCommitFailure(time.milliseconds() - started, e);
return false;
} catch (TimeoutException e) {
log.error("{} Timed out waiting to flush offsets to storage", this);
finishFailedFlush();
+ recordCommitFailure(time.milliseconds() - started, null);
return false;
}
finishSuccessfulFlush();
+ long durationMillis = time.milliseconds() - started;
+ recordCommitSuccess(durationMillis);
log.info("{} Finished commitOffsets successfully in {} ms",
- this, time.milliseconds() - started);
+ this, durationMillis);
commitSourceTask();
http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index 3295434..44703dd 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -16,6 +16,17 @@
*/
package org.apache.kafka.connect.runtime;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.AbstractStatus.State;
+import org.apache.kafka.connect.runtime.ConnectMetrics.IndicatorPredicate;
+import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
@@ -42,6 +53,7 @@ abstract class WorkerTask implements Runnable {
private final TaskStatus.Listener statusListener;
protected final ClassLoader loader;
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+ private final TaskMetricsGroup taskMetricsGroup;
private volatile TargetState targetState;
private volatile boolean stopping; // indicates whether the Worker has asked the task to stop
private volatile boolean cancelled; // indicates whether the Worker has cancelled the task (e.g. because of slow shutdown)
@@ -49,13 +61,16 @@ abstract class WorkerTask implements Runnable {
public WorkerTask(ConnectorTaskId id,
TaskStatus.Listener statusListener,
TargetState initialState,
- ClassLoader loader) {
+ ClassLoader loader,
+ ConnectMetrics connectMetrics) {
this.id = id;
- this.statusListener = statusListener;
+ this.taskMetricsGroup = new TaskMetricsGroup(this.id, connectMetrics, statusListener);
+ this.statusListener = taskMetricsGroup;
this.loader = loader;
this.targetState = initialState;
this.stopping = false;
this.cancelled = false;
+ this.taskMetricsGroup.recordState(this.targetState);
}
public ConnectorTaskId id() {
@@ -68,6 +83,7 @@ abstract class WorkerTask implements Runnable {
/**
* Initialize the task for execution.
+ *
* @param taskConfig initial configuration
*/
public abstract void initialize(TaskConfig taskConfig);
@@ -116,6 +132,12 @@ abstract class WorkerTask implements Runnable {
protected abstract void close();
+ /**
+ * Method called when this worker task has been completely closed, and when the subclass should clean up
+ * all resources.
+ */
+ protected abstract void releaseResources();
+
protected boolean isStopping() {
return stopping;
}
@@ -195,8 +217,16 @@ abstract class WorkerTask implements Runnable {
if (t instanceof Error)
throw (Error) t;
} finally {
- Plugins.compareAndSwapLoaders(savedLoader);
- shutdownLatch.countDown();
+ try {
+ Plugins.compareAndSwapLoaders(savedLoader);
+ shutdownLatch.countDown();
+ } finally {
+ try {
+ releaseResources();
+ } finally {
+ taskMetricsGroup.close();
+ }
+ }
}
}
@@ -206,6 +236,7 @@ abstract class WorkerTask implements Runnable {
/**
* Await task resumption.
+ *
* @return true if the task's target state is not paused, false if the task is shutdown before resumption
* @throws InterruptedException
*/
@@ -231,4 +262,183 @@ abstract class WorkerTask implements Runnable {
}
}
-}
+ /**
+ * Record that offsets have been committed.
+ *
+ * @param duration the length of time in milliseconds for the commit attempt to complete
+ */
+ protected void recordCommitSuccess(long duration) {
+ taskMetricsGroup.recordCommit(duration, true, null);
+ }
+
+ /**
+ * Record that offsets have been committed.
+ *
+ * @param duration the length of time in milliseconds for the commit attempt to complete
+ * @param error the unexpected error that occurred; may be null in the case of timeouts or interruptions
+ */
+ protected void recordCommitFailure(long duration, Throwable error) {
+ taskMetricsGroup.recordCommit(duration, false, error);
+ }
+
+ /**
+ * Record that a batch of records has been processed.
+ *
+ * @param size the number of records in the batch
+ */
+ protected void recordBatch(int size) {
+ taskMetricsGroup.recordBatch(size);
+ }
+
+ TaskMetricsGroup taskMetricsGroup() {
+ return taskMetricsGroup;
+ }
+
+ static class TaskMetricsGroup implements TaskStatus.Listener {
+ private final TaskStatus.Listener delegateListener;
+ private final MetricGroup metricGroup;
+ private final Time time;
+ private final StateTracker taskStateTimer;
+ private final Sensor commitTime;
+ private final Sensor batchSize;
+ private final Sensor commitAttempts;
+
+ public TaskMetricsGroup(ConnectorTaskId id, ConnectMetrics connectMetrics, TaskStatus.Listener statusListener) {
+ delegateListener = statusListener;
+ time = connectMetrics.time();
+ taskStateTimer = new StateTracker();
+ metricGroup = connectMetrics.group("connector-tasks",
+ "connector", id.connector(), "task", Integer.toString(id.task()));
+
+ addTaskStateMetric(State.UNASSIGNED, "status-unassigned",
+ "Signals whether the connector task is in the unassigned state.");
+ addTaskStateMetric(State.RUNNING, "status-running",
+ "Signals whether the connector task is in the running state.");
+ addTaskStateMetric(State.PAUSED, "status-paused",
+ "Signals whether the connector task is in the paused state.");
+ addTaskStateMetric(State.FAILED, "status-failed",
+ "Signals whether the connector task is in the failed state.");
+ addTaskStateMetric(State.DESTROYED, "status-destroyed",
+ "Signals whether the connector task is in the destroyed state.");
+
+ addRatioMetric(State.RUNNING, "running-ratio",
+ "The fraction of time this task has spent in the running state.");
+ addRatioMetric(State.PAUSED, "pause-ratio",
+ "The fraction of time this task has spent in the paused state.");
+
+ commitTime = metricGroup.sensor("commit-time");
+ commitTime.add(metricGroup.metricName("offset-commit-max-time-ms",
+ "The maximum time in milliseconds taken by this task to commit offsets"),
+ new Max());
+ commitTime.add(metricGroup.metricName("offset-commit-avg-time-ms",
+ "The average time in milliseconds taken by this task to commit offsets"),
+ new Avg());
+
+ batchSize = metricGroup.sensor("batch-size");
+ batchSize.add(metricGroup.metricName("batch-size-max",
+ "The maximum size of the batches processed by the connector"),
+ new Max());
+ batchSize.add(metricGroup.metricName("batch-size-avg",
+ "The average size of the batches processed by the connector"),
+ new Avg());
+
+ MetricName offsetCommitFailures = metricGroup.metricName("offset-commit-failure-percentage",
+ "The average percentage of this task's offset commit attempts that failed");
+ MetricName offsetCommitSucceeds = metricGroup.metricName("offset-commit-success-percentage",
+ "The average percentage of this task's offset commit attempts that failed");
+ Frequencies commitFrequencies = Frequencies.forBooleanValues(offsetCommitFailures, offsetCommitSucceeds);
+ commitAttempts = metricGroup.sensor("offset-commit-completion");
+ commitAttempts.add(commitFrequencies);
+ }
+
+ private void addTaskStateMetric(final State matchingState, String name, String description) {
+ metricGroup.addIndicatorMetric(name, description, new IndicatorPredicate() {
+ @Override
+ public boolean matches() {
+ return matchingState == taskStateTimer.currentState();
+ }
+ });
+ }
+
+ private void addRatioMetric(final State matchingState, String name, String description) {
+ MetricName metricName = metricGroup.metricName(name, description);
+ if (metricGroup.metrics().metric(metricName) == null) {
+ metricGroup.metrics().addMetric(metricName, new Measurable() {
+ @Override
+ public double measure(MetricConfig config, long now) {
+ return taskStateTimer.durationRatio(matchingState, now);
+ }
+ });
+ }
+ }
+
+ void close() {
+ metricGroup.close();
+ }
+
+ void recordCommit(long duration, boolean success, Throwable error) {
+ if (success) {
+ commitTime.record(duration);
+ commitAttempts.record(1.0d);
+ } else {
+ commitAttempts.record(0.0d);
+ }
+ }
+
+ void recordBatch(int size) {
+ batchSize.record(size);
+ }
+
+ @Override
+ public void onStartup(ConnectorTaskId id) {
+ taskStateTimer.changeState(State.RUNNING, time.milliseconds());
+ delegateListener.onStartup(id);
+ }
+
+ @Override
+ public void onFailure(ConnectorTaskId id, Throwable cause) {
+ taskStateTimer.changeState(State.FAILED, time.milliseconds());
+ delegateListener.onFailure(id, cause);
+ }
+
+ @Override
+ public void onPause(ConnectorTaskId id) {
+ taskStateTimer.changeState(State.PAUSED, time.milliseconds());
+ delegateListener.onPause(id);
+ }
+
+ @Override
+ public void onResume(ConnectorTaskId id) {
+ taskStateTimer.changeState(State.RUNNING, time.milliseconds());
+ delegateListener.onResume(id);
+ }
+
+ @Override
+ public void onShutdown(ConnectorTaskId id) {
+ taskStateTimer.changeState(State.UNASSIGNED, time.milliseconds());
+ delegateListener.onShutdown(id);
+ }
+
+ public void recordState(TargetState state) {
+ switch (state) {
+ case STARTED:
+ taskStateTimer.changeState(State.RUNNING, time.milliseconds());
+ break;
+ case PAUSED:
+ taskStateTimer.changeState(State.PAUSED, time.milliseconds());
+ break;
+ default:
+ break;
+ }
+ }
+
+ public State state() {
+ return taskStateTimer.currentState();
+ }
+
+ double currentMetricValue(String name) {
+ MetricName metricName = metricGroup.metricName(name, "desc");
+ return metricGroup.metrics().metric(metricName).value();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
index 34997e6..6de0638 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroupId;
import org.apache.kafka.connect.util.MockTime;
import org.junit.After;
import org.junit.Before;
@@ -26,6 +27,7 @@ import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
@@ -141,6 +143,56 @@ public class ConnectMetricsTest {
MetricGroup group3 = metrics.group("other");
assertNotNull(group3);
assertNotSame(group1, group3);
+
+ // Now with tags
+ MetricGroup group4 = metrics.group("name", "k1", "v1");
+ assertNotNull(group4);
+ assertNotSame(group1, group4);
+ assertNotSame(group2, group4);
+ assertNotSame(group3, group4);
+ MetricGroup group5 = metrics.group("name", "k1", "v1");
+ assertSame(group4, group5);
+ }
+
+ @Test
+ public void testMetricGroupIdIdentity() {
+ MetricGroupId id1 = metrics.groupId("name", false, "k1", "v1");
+ MetricGroupId id2 = metrics.groupId("name", false, "k1", "v1");
+ MetricGroupId id3 = metrics.groupId("name", false, "k1", "v1", "k2", "v2");
+
+ assertEquals(id1.hashCode(), id2.hashCode());
+ assertEquals(id1, id2);
+ assertEquals(id1.toString(), id2.toString());
+ assertEquals(id1.groupName(), id2.groupName());
+ assertEquals(id1.tags(), id2.tags());
+ assertNotNull(id1.tags());
+
+ assertNotEquals(id1, id3);
+ }
+
+ @Test
+ public void testMetricGroupIdWithoutTags() {
+ MetricGroupId id1 = metrics.groupId("name", false);
+ MetricGroupId id2 = metrics.groupId("name", false);
+
+ assertEquals(id1.hashCode(), id2.hashCode());
+ assertEquals(id1, id2);
+ assertEquals(id1.toString(), id2.toString());
+ assertEquals(id1.groupName(), id2.groupName());
+ assertEquals(id1.tags(), id2.tags());
+ assertNotNull(id1.tags());
+ assertNotNull(id2.tags());
+ }
+
+ @Test
+ public void testMetricGroupIdWithWorkerId() {
+ MetricGroupId id1 = metrics.groupId("name", true);
+ assertNotNull(metrics.workerId(), id1.tags().get(ConnectMetrics.WORKER_ID_TAG_NAME));
+ assertEquals("name;worker-id=worker1", id1.toString());
+
+ id1 = metrics.groupId("name", true, "k1", "v1", "k2", "v2");
+ assertNotNull(metrics.workerId(), id1.tags().get(ConnectMetrics.WORKER_ID_TAG_NAME));
+ assertEquals("name;worker-id=worker1;k1=v1;k2=v2", id1.toString()); // maintain order of tags
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/73cc4166/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
new file mode 100644
index 0000000..0a61e10
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.util.MockTime;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class MockConnectMetrics extends ConnectMetrics {
+
+ private static final Map<String, String> DEFAULT_WORKER_CONFIG = new HashMap<>();
+ static {
+ DEFAULT_WORKER_CONFIG.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+ DEFAULT_WORKER_CONFIG.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+ DEFAULT_WORKER_CONFIG.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+ DEFAULT_WORKER_CONFIG.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+ }
+
+ public MockConnectMetrics() {
+ super("mock", new WorkerConfig(WorkerConfig.baseConfigDef(), DEFAULT_WORKER_CONFIG), new MockTime());
+ }
+
+ @Override
+ public MockTime time() {
+ return (MockTime) super.time();
+ }
+}