You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2017/10/05 18:23:14 UTC
[2/2] kafka git commit: KAFKA-5903: Added Connect metrics to the
worker and distributed herder (KIP-196)
KAFKA-5903: Added Connect metrics to the worker and distributed herder (KIP-196)
Added metrics to the Connect worker and rebalancing metrics to the distributed herder.
This is built on top of #3987, and I can rebase this PR once that is merged.
Author: Randall Hauch <rh...@gmail.com>
Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>
Closes #4011 from rhauch/kafka-5903
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a47bfbca
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a47bfbca
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a47bfbca
Branch: refs/heads/trunk
Commit: a47bfbcae050659d32f777ed2f4b26dda5fbdbbd
Parents: ff5fc9d
Author: Randall Hauch <rh...@gmail.com>
Authored: Thu Oct 5 11:23:11 2017 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Thu Oct 5 11:23:11 2017 -0700
----------------------------------------------------------------------
.../kafka/connect/runtime/ConnectMetrics.java | 61 +++++++---
.../connect/runtime/ConnectMetricsRegistry.java | 121 ++++++++++++++-----
.../apache/kafka/connect/runtime/Worker.java | 111 +++++++++++++++++
.../kafka/connect/runtime/WorkerConnector.java | 37 ++++--
.../kafka/connect/runtime/WorkerTask.java | 23 ++--
.../runtime/distributed/DistributedHerder.java | 87 ++++++++++++-
.../connect/runtime/MockConnectMetrics.java | 82 +++++++++++--
.../connect/runtime/WorkerConnectorTest.java | 44 ++++++-
.../connect/runtime/WorkerSinkTaskTest.java | 54 ++++-----
.../connect/runtime/WorkerSourceTaskTest.java | 32 ++---
.../kafka/connect/runtime/WorkerTaskTest.java | 4 +-
.../kafka/connect/runtime/WorkerTest.java | 101 ++++++++++++++--
.../distributed/DistributedHerderTest.java | 92 +++++++++++++-
13 files changed, 704 insertions(+), 145 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/a47bfbca/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
index 974967a..3cd1eae 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
@@ -19,8 +19,8 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.JmxReporter;
-import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
@@ -68,9 +68,11 @@ public class ConnectMetrics {
this.time = time;
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
- .timeWindow(config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
- .recordLevel(Sensor.RecordingLevel.forName(config.getString(CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG)));
- List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
+ .timeWindow(config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG),
+ TimeUnit.MILLISECONDS).recordLevel(
+ Sensor.RecordingLevel.forName(config.getString(CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG)));
+ List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
+ MetricsReporter.class);
reporters.add(new JmxReporter(JMX_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time);
LOG.debug("Registering Connect metrics with JMX for worker '{}'", workerId);
@@ -121,7 +123,8 @@ public class ConnectMetrics {
if (group == null) {
group = new MetricGroup(groupId);
MetricGroup previous = groupsByName.putIfAbsent(groupId, group);
- if (previous != null) group = previous;
+ if (previous != null)
+ group = previous;
}
return group;
}
@@ -204,7 +207,8 @@ public class ConnectMetrics {
@Override
public boolean equals(Object obj) {
- if (obj == this) return true;
+ if (obj == this)
+ return true;
if (obj instanceof MetricGroupId) {
MetricGroupId that = (MetricGroupId) obj;
return this.groupName.equals(that.groupName) && this.tags.equals(that.tags);
@@ -290,19 +294,38 @@ public class ConnectMetrics {
}
/**
- * Add to this group an indicator metric with a function that will be used to obtain the indicator state.
+ * Add to this group an indicator metric with a function that returns the current value.
*
* @param nameTemplate the name template for the metric; may not be null
- * @param predicate the predicate function used to determine the indicator state; may not be null
+ * @param supplier the function used to determine the literal value of the metric; may not be null
* @throws IllegalArgumentException if the name is not valid
*/
- public void addIndicatorMetric(MetricNameTemplate nameTemplate, final IndicatorPredicate predicate) {
+ public <T> void addValueMetric(MetricNameTemplate nameTemplate, final LiteralSupplier<T> supplier) {
MetricName metricName = metricName(nameTemplate);
if (metrics().metric(metricName) == null) {
- metrics().addMetric(metricName, new Measurable() {
+ metrics().addMetric(metricName, new Gauge<T>() {
@Override
- public double measure(MetricConfig config, long now) {
- return predicate.matches() ? 1.0d : 0.0d;
+ public T value(MetricConfig config, long now) {
+ return supplier.metricValue(now);
+ }
+ });
+ }
+ }
+
+ /**
+ * Add to this group an indicator metric that always returns the specified value.
+ *
+ * @param nameTemplate the name template for the metric; may not be null
+ * @param value the value; may not be null
+ * @throws IllegalArgumentException if the name is not valid
+ */
+ public <T> void addImmutableValueMetric(MetricNameTemplate nameTemplate, final T value) {
+ MetricName metricName = metricName(nameTemplate);
+ if (metrics().metric(metricName) == null) {
+ metrics().addMetric(metricName, new Gauge<T>() {
+ @Override
+ public T value(MetricConfig config, long now) {
+ return value;
}
});
}
@@ -369,7 +392,8 @@ public class ConnectMetrics {
public synchronized Sensor sensor(String name, MetricConfig config, Sensor.RecordingLevel recordingLevel, Sensor... parents) {
// We need to make sure that all sensor names are unique across all groups, so use the sensor prefix
Sensor result = metrics.sensor(sensorPrefix + name, config, Long.MAX_VALUE, recordingLevel, parents);
- if (result != null) sensorNames.add(result.name());
+ if (result != null)
+ sensorNames.add(result.name());
return result;
}
@@ -390,16 +414,17 @@ public class ConnectMetrics {
}
/**
- * A simple functional interface that determines whether an indicator metric is true.
+ * A simple functional interface that returns a literal value.
*/
- public interface IndicatorPredicate {
+ public interface LiteralSupplier<T> {
/**
- * Return whether the indicator metric is true.
+ * Return the literal value for the metric.
*
- * @return true if the indicator metric is satisfied, or false otherwise
+ * @param now the current time in milliseconds
+ * @return the literal metric value; may not be null
*/
- boolean matches();
+ T metricValue(long now);
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/a47bfbca/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
index ee513c9..d78576e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
@@ -32,16 +32,15 @@ public class ConnectMetricsRegistry {
public static final String TASK_GROUP_NAME = "connector-task-metrics";
public static final String SOURCE_TASK_GROUP_NAME = "source-task-metrics";
public static final String SINK_TASK_GROUP_NAME = "sink-task-metrics";
+ public static final String WORKER_GROUP_NAME = "connect-worker-metrics";
+ public static final String WORKER_REBALANCE_GROUP_NAME = "connect-worker-rebalance-metrics";
private final List<MetricNameTemplate> allTemplates = new ArrayList<>();
- public final MetricNameTemplate connectorStatusRunning;
- public final MetricNameTemplate connectorStatusPaused;
- public final MetricNameTemplate connectorStatusFailed;
- public final MetricNameTemplate taskStatusUnassigned;
- public final MetricNameTemplate taskStatusRunning;
- public final MetricNameTemplate taskStatusPaused;
- public final MetricNameTemplate taskStatusFailed;
- public final MetricNameTemplate taskStatusDestroyed;
+ public final MetricNameTemplate connectorStatus;
+ public final MetricNameTemplate connectorType;
+ public final MetricNameTemplate connectorClass;
+ public final MetricNameTemplate connectorVersion;
+ public final MetricNameTemplate taskStatus;
public final MetricNameTemplate taskRunningRatio;
public final MetricNameTemplate taskPauseRatio;
public final MetricNameTemplate taskCommitTimeMax;
@@ -75,6 +74,25 @@ public class ConnectMetricsRegistry {
public final MetricNameTemplate sinkRecordActiveCount;
public final MetricNameTemplate sinkRecordActiveCountMax;
public final MetricNameTemplate sinkRecordActiveCountAvg;
+ public final MetricNameTemplate connectorCount;
+ public final MetricNameTemplate taskCount;
+ public final MetricNameTemplate connectorStartupAttemptsTotal;
+ public final MetricNameTemplate connectorStartupSuccessTotal;
+ public final MetricNameTemplate connectorStartupSuccessPercentage;
+ public final MetricNameTemplate connectorStartupFailureTotal;
+ public final MetricNameTemplate connectorStartupFailurePercentage;
+ public final MetricNameTemplate taskStartupAttemptsTotal;
+ public final MetricNameTemplate taskStartupSuccessTotal;
+ public final MetricNameTemplate taskStartupSuccessPercentage;
+ public final MetricNameTemplate taskStartupFailureTotal;
+ public final MetricNameTemplate taskStartupFailurePercentage;
+ public final MetricNameTemplate leaderName;
+ public final MetricNameTemplate epoch;
+ public final MetricNameTemplate rebalanceCompletedTotal;
+ public final MetricNameTemplate rebalanceMode;
+ public final MetricNameTemplate rebalanceTimeMax;
+ public final MetricNameTemplate rebalanceTimeAvg;
+ public final MetricNameTemplate rebalanceTimeSinceLast;
public ConnectMetricsRegistry() {
this(new LinkedHashSet<String>());
@@ -85,28 +103,25 @@ public class ConnectMetricsRegistry {
Set<String> connectorTags = new LinkedHashSet<>(tags);
connectorTags.add(CONNECTOR_TAG_NAME);
- connectorStatusRunning = createTemplate("status-running", CONNECTOR_GROUP_NAME,
- "Signals whether the connector is in the running state.", connectorTags);
- connectorStatusPaused = createTemplate("status-paused", CONNECTOR_GROUP_NAME,
- "Signals whether the connector is in the paused state.", connectorTags);
- connectorStatusFailed = createTemplate("status-failed", CONNECTOR_GROUP_NAME,
- "Signals whether the connector is in the failed state.", connectorTags);
+ connectorStatus = createTemplate("status", CONNECTOR_GROUP_NAME,
+ "The status of the connector. One of 'unassigned', 'running', 'paused', 'failed', or " +
+ "'destroyed'.",
+ connectorTags);
+ connectorType = createTemplate("connector-type", CONNECTOR_GROUP_NAME, "The type of the connector. One of 'source' or 'sink'.",
+ connectorTags);
+ connectorClass = createTemplate("connector-class", CONNECTOR_GROUP_NAME, "The name of the connector class.", connectorTags);
+ connectorVersion = createTemplate("connector-version", CONNECTOR_GROUP_NAME,
+ "The version of the connector class, as reported by the connector.", connectorTags);
/***** Worker task level *****/
Set<String> workerTaskTags = new LinkedHashSet<>(tags);
workerTaskTags.add(CONNECTOR_TAG_NAME);
workerTaskTags.add(TASK_TAG_NAME);
- taskStatusUnassigned = createTemplate("status-unassigned", TASK_GROUP_NAME, "Signals whether this task is in the unassigned state.",
- workerTaskTags);
- taskStatusRunning = createTemplate("status-running", TASK_GROUP_NAME, "Signals whether this task is in the running state.",
- workerTaskTags);
- taskStatusPaused = createTemplate("status-paused", TASK_GROUP_NAME, "Signals whether this task is in the paused state.",
- workerTaskTags);
- taskStatusFailed = createTemplate("status-failed", TASK_GROUP_NAME, "Signals whether this task is in the failed state.",
- workerTaskTags);
- taskStatusDestroyed = createTemplate("status-destroyed", TASK_GROUP_NAME, "Signals whether this task is in the destroyed state.",
- workerTaskTags);
+ taskStatus = createTemplate("status", TASK_GROUP_NAME,
+ "The status of the connector task. One of 'unassigned', 'running', 'paused', 'failed', or " +
+ "'destroyed'.",
+ workerTaskTags);
taskRunningRatio = createTemplate("running-ratio", TASK_GROUP_NAME,
"The fraction of time this task has spent in the running state.", workerTaskTags);
taskPauseRatio = createTemplate("pause-ratio", TASK_GROUP_NAME, "The fraction of time this task has spent in the pause state.",
@@ -230,13 +245,55 @@ public class ConnectMetricsRegistry {
"committed/flushed/acknowledged by the sink task.",
sinkTaskTags);
sinkRecordActiveCountMax = createTemplate("sink-record-active-count-max", SINK_TASK_GROUP_NAME,
- "The maximum number of records that have been read from Kafka but not yet completely " +
- "committed/flushed/acknowledged by the sink task.",
+ "The maximum number of records that have been read from Kafka but not yet completely "
+ + "committed/flushed/acknowledged by the sink task.",
sinkTaskTags);
sinkRecordActiveCountAvg = createTemplate("sink-record-active-count-avg", SINK_TASK_GROUP_NAME,
- "The average number of records that have been read from Kafka but not yet completely " +
- "committed/flushed/acknowledged by the sink task.",
+ "The average number of records that have been read from Kafka but not yet completely "
+ + "committed/flushed/acknowledged by the sink task.",
sinkTaskTags);
+
+ /***** Worker level *****/
+ Set<String> workerTags = new LinkedHashSet<>(tags);
+
+ connectorCount = createTemplate("connector-count", WORKER_GROUP_NAME, "The number of connectors run in this worker.", workerTags);
+ taskCount = createTemplate("task-count", WORKER_GROUP_NAME, "The number of tasks run in this worker.", workerTags);
+ connectorStartupAttemptsTotal = createTemplate("connector-startup-attempts-total", WORKER_GROUP_NAME,
+ "The total number of connector startups that this worker has attempted.", workerTags);
+ connectorStartupSuccessTotal = createTemplate("connector-startup-success-total", WORKER_GROUP_NAME,
+ "The total number of connector starts that succeeded.", workerTags);
+ connectorStartupSuccessPercentage = createTemplate("connector-startup-success-percentage", WORKER_GROUP_NAME,
+ "The average percentage of this worker's connectors starts that succeeded.", workerTags);
+ connectorStartupFailureTotal = createTemplate("connector-startup-failure-total", WORKER_GROUP_NAME,
+ "The total number of connector starts that failed.", workerTags);
+ connectorStartupFailurePercentage = createTemplate("connector-startup-failure-percentage", WORKER_GROUP_NAME,
+ "The average percentage of this worker's connectors starts that failed.", workerTags);
+ taskStartupAttemptsTotal = createTemplate("task-startup-attempts-total", WORKER_GROUP_NAME,
+ "The total number of task startups that this worker has attempted.", workerTags);
+ taskStartupSuccessTotal = createTemplate("task-startup-success-total", WORKER_GROUP_NAME,
+ "The total number of task starts that succeeded.", workerTags);
+ taskStartupSuccessPercentage = createTemplate("task-startup-success-percentage", WORKER_GROUP_NAME,
+ "The average percentage of this worker's tasks starts that succeeded.", workerTags);
+ taskStartupFailureTotal = createTemplate("task-startup-failure-total", WORKER_GROUP_NAME,
+ "The total number of task starts that failed.", workerTags);
+ taskStartupFailurePercentage = createTemplate("task-startup-failure-percentage", WORKER_GROUP_NAME,
+ "The average percentage of this worker's tasks starts that failed.", workerTags);
+
+ /***** Worker rebalance level *****/
+ Set<String> rebalanceTags = new LinkedHashSet<>(tags);
+
+ leaderName = createTemplate("leader-name", WORKER_REBALANCE_GROUP_NAME, "The name of the group leader.", rebalanceTags);
+ epoch = createTemplate("epoch", WORKER_REBALANCE_GROUP_NAME, "The epoch or generation number of this worker.", rebalanceTags);
+ rebalanceCompletedTotal = createTemplate("completed-rebalances-total", WORKER_REBALANCE_GROUP_NAME,
+ "The total number of rebalances completed by this worker.", rebalanceTags);
+ rebalanceMode = createTemplate("rebalancing", WORKER_REBALANCE_GROUP_NAME,
+ "Whether this worker is currently rebalancing.", rebalanceTags);
+ rebalanceTimeMax = createTemplate("rebalance-max-time-ms", WORKER_REBALANCE_GROUP_NAME,
+ "The maximum time in milliseconds spent by this worker to rebalance.", rebalanceTags);
+ rebalanceTimeAvg = createTemplate("rebalance-avg-time-ms", WORKER_REBALANCE_GROUP_NAME,
+ "The average time in milliseconds spent by this worker to rebalance.", rebalanceTags);
+ rebalanceTimeSinceLast = createTemplate("time-since-last-rebalance-ms", WORKER_REBALANCE_GROUP_NAME,
+ "The time in milliseconds since this worker completed the most recent rebalance.", rebalanceTags);
}
private MetricNameTemplate createTemplate(String name, String group, String doc, Set<String> tags) {
@@ -272,4 +329,12 @@ public class ConnectMetricsRegistry {
public String sourceTaskGroupName() {
return SOURCE_TASK_GROUP_NAME;
}
+
+ public String workerGroupName() {
+ return WORKER_GROUP_NAME;
+ }
+
+ public String workerRebalanceGroupName() {
+ return WORKER_REBALANCE_GROUP_NAME;
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a47bfbca/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 56bc341..c6e2e17 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -18,12 +18,18 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.ConnectMetrics.LiteralSupplier;
+import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
@@ -68,6 +74,7 @@ public class Worker {
private final String workerId;
private final Plugins plugins;
private final ConnectMetrics metrics;
+ private final WorkerMetricsGroup workerMetricsGroup;
private final WorkerConfig config;
private final Converter internalKeyConverter;
private final Converter internalValueConverter;
@@ -91,6 +98,8 @@ public class Worker {
this.time = time;
this.plugins = plugins;
this.config = config;
+ this.workerMetricsGroup = new WorkerMetricsGroup(metrics);
+
// Internal converters are required properties, thus getClass won't return null.
this.internalKeyConverter = plugins.newConverter(
config.getClass(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG).getName(),
@@ -164,6 +173,8 @@ public class Worker {
metrics.stop();
log.info("Worker stopped");
+
+ workerMetricsGroup.close();
}
/**
@@ -204,6 +215,7 @@ public class Worker {
// Can't be put in a finally block because it needs to be swapped before the call on
// statusListener
Plugins.compareAndSwapLoaders(savedLoader);
+ workerMetricsGroup.recordConnectorStartupFailure();
statusListener.onFailure(connName, t);
return false;
}
@@ -213,6 +225,7 @@ public class Worker {
throw new ConnectException("Connector with name " + connName + " already exists");
log.info("Finished creating connector {}", connName);
+ workerMetricsGroup.recordConnectorStartupSuccess();
return true;
}
@@ -396,6 +409,7 @@ public class Worker {
// Can't be put in a finally block because it needs to be swapped before the call on
// statusListener
Plugins.compareAndSwapLoaders(savedLoader);
+ workerMetricsGroup.recordTaskFailure();
statusListener.onFailure(id, t);
return false;
}
@@ -408,6 +422,7 @@ public class Worker {
if (workerTask instanceof WorkerSourceTask) {
sourceTaskOffsetCommitter.schedule(id, (WorkerSourceTask) workerTask);
}
+ workerMetricsGroup.recordTaskSuccess();
return true;
}
@@ -583,4 +598,100 @@ public class Worker {
Plugins.compareAndSwapLoaders(savedLoader);
}
}
+
+ WorkerMetricsGroup workerMetricsGroup() {
+ return workerMetricsGroup;
+ }
+
+ class WorkerMetricsGroup {
+ private final MetricGroup metricGroup;
+ private final Sensor connectorStartupAttempts;
+ private final Sensor connectorStartupSuccesses;
+ private final Sensor connectorStartupFailures;
+ private final Sensor connectorStartupResults;
+ private final Sensor taskStartupAttempts;
+ private final Sensor taskStartupSuccesses;
+ private final Sensor taskStartupFailures;
+ private final Sensor taskStartupResults;
+
+ public WorkerMetricsGroup(ConnectMetrics connectMetrics) {
+ ConnectMetricsRegistry registry = connectMetrics.registry();
+ metricGroup = connectMetrics.group(registry.workerGroupName());
+
+ metricGroup.addValueMetric(registry.connectorCount, new LiteralSupplier<Double>() {
+ @Override
+ public Double metricValue(long now) {
+ return (double) connectors.size();
+ }
+ });
+ metricGroup.addValueMetric(registry.taskCount, new LiteralSupplier<Double>() {
+ @Override
+ public Double metricValue(long now) {
+ return (double) tasks.size();
+ }
+ });
+
+ MetricName connectorFailurePct = metricGroup.metricName(registry.connectorStartupFailurePercentage);
+ MetricName connectorSuccessPct = metricGroup.metricName(registry.connectorStartupSuccessPercentage);
+ Frequencies connectorStartupResultFrequencies = Frequencies.forBooleanValues(connectorFailurePct, connectorSuccessPct);
+ connectorStartupResults = metricGroup.sensor("connector-startup-results");
+ connectorStartupResults.add(connectorStartupResultFrequencies);
+
+ connectorStartupAttempts = metricGroup.sensor("connector-startup-attempts");
+ connectorStartupAttempts.add(metricGroup.metricName(registry.connectorStartupAttemptsTotal), new Total());
+
+ connectorStartupSuccesses = metricGroup.sensor("connector-startup-successes");
+ connectorStartupSuccesses.add(metricGroup.metricName(registry.connectorStartupSuccessTotal), new Total());
+
+ connectorStartupFailures = metricGroup.sensor("connector-startup-failures");
+ connectorStartupFailures.add(metricGroup.metricName(registry.connectorStartupFailureTotal), new Total());
+
+ MetricName taskFailurePct = metricGroup.metricName(registry.taskStartupFailurePercentage);
+ MetricName taskSuccessPct = metricGroup.metricName(registry.taskStartupSuccessPercentage);
+ Frequencies taskStartupResultFrequencies = Frequencies.forBooleanValues(taskFailurePct, taskSuccessPct);
+ taskStartupResults = metricGroup.sensor("task-startup-results");
+ taskStartupResults.add(taskStartupResultFrequencies);
+
+ taskStartupAttempts = metricGroup.sensor("task-startup-attempts");
+ taskStartupAttempts.add(metricGroup.metricName(registry.taskStartupAttemptsTotal), new Total());
+
+ taskStartupSuccesses = metricGroup.sensor("task-startup-successes");
+ taskStartupSuccesses.add(metricGroup.metricName(registry.taskStartupSuccessTotal), new Total());
+
+ taskStartupFailures = metricGroup.sensor("task-startup-failures");
+ taskStartupFailures.add(metricGroup.metricName(registry.taskStartupFailureTotal), new Total());
+ }
+
+ void close() {
+ metricGroup.close();
+ }
+
+ void recordConnectorStartupFailure() {
+ connectorStartupAttempts.record(1.0);
+ connectorStartupFailures.record(1.0);
+ connectorStartupResults.record(0.0);
+ }
+
+ void recordConnectorStartupSuccess() {
+ connectorStartupAttempts.record(1.0);
+ connectorStartupSuccesses.record(1.0);
+ connectorStartupResults.record(1.0);
+ }
+
+ void recordTaskFailure() {
+ taskStartupAttempts.record(1.0);
+ taskStartupFailures.record(1.0);
+ taskStartupResults.record(0.0);
+ }
+
+ void recordTaskSuccess() {
+ taskStartupAttempts.record(1.0);
+ taskStartupSuccesses.record(1.0);
+ taskStartupResults.record(1.0);
+ }
+
+ protected MetricGroup metricGroup() {
+ return metricGroup;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a47bfbca/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
index 21104bd..9e65cd2 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
@@ -16,16 +16,18 @@
*/
package org.apache.kafka.connect.runtime;
-import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
-import org.apache.kafka.connect.runtime.ConnectMetrics.IndicatorPredicate;
+import org.apache.kafka.connect.runtime.ConnectMetrics.LiteralSupplier;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Locale;
import java.util.Map;
+import java.util.Objects;
/**
* Container for connectors which is responsible for managing their lifecycle (e.g. handling startup,
@@ -199,6 +201,18 @@ public class WorkerConnector {
return SinkConnector.class.isAssignableFrom(connector.getClass());
}
+ public boolean isSourceConnector() {
+ return SourceConnector.class.isAssignableFrom(connector.getClass());
+ }
+
+ protected String connectorType() {
+ if (isSinkConnector())
+ return "sink";
+ if (isSourceConnector())
+ return "source";
+ return "unknown";
+ }
+
public Connector connector() {
return connector;
}
@@ -224,22 +238,23 @@ public class WorkerConnector {
private final ConnectorStatus.Listener delegate;
public ConnectorMetricsGroup(ConnectMetrics connectMetrics, AbstractStatus.State initialState, ConnectorStatus.Listener delegate) {
+ Objects.requireNonNull(connectMetrics);
+ Objects.requireNonNull(connector);
+ Objects.requireNonNull(initialState);
+ Objects.requireNonNull(delegate);
this.delegate = delegate;
this.state = initialState;
ConnectMetricsRegistry registry = connectMetrics.registry();
this.metricGroup = connectMetrics.group(registry.connectorGroupName(),
registry.connectorTagName(), connName);
- addStateMetric(AbstractStatus.State.RUNNING, registry.connectorStatusRunning);
- addStateMetric(AbstractStatus.State.PAUSED, registry.connectorStatusPaused);
- addStateMetric(AbstractStatus.State.FAILED, registry.connectorStatusFailed);
- }
-
- private void addStateMetric(final AbstractStatus.State matchingState, MetricNameTemplate nameTemplate) {
- metricGroup.addIndicatorMetric(nameTemplate, new IndicatorPredicate() {
+ metricGroup.addImmutableValueMetric(registry.connectorType, connectorType());
+ metricGroup.addImmutableValueMetric(registry.connectorClass, connector.getClass().getName());
+ metricGroup.addImmutableValueMetric(registry.connectorVersion, connector.version());
+ metricGroup.addValueMetric(registry.connectorStatus, new LiteralSupplier<String>() {
@Override
- public boolean matches() {
- return state == matchingState;
+ public String metricValue(long now) {
+ return state.toString().toLowerCase(Locale.getDefault());
}
});
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a47bfbca/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index 6499ac2..ec06924 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -26,13 +26,14 @@ import org.apache.kafka.common.metrics.stats.Frequencies;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.AbstractStatus.State;
-import org.apache.kafka.connect.runtime.ConnectMetrics.IndicatorPredicate;
+import org.apache.kafka.connect.runtime.ConnectMetrics.LiteralSupplier;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Locale;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -313,11 +314,12 @@ abstract class WorkerTask implements Runnable {
registry.connectorTagName(), id.connector(),
registry.taskTagName(), Integer.toString(id.task()));
- addTaskStateMetric(State.UNASSIGNED, registry.taskStatusUnassigned);
- addTaskStateMetric(State.RUNNING, registry.taskStatusRunning);
- addTaskStateMetric(State.PAUSED, registry.taskStatusPaused);
- addTaskStateMetric(State.FAILED, registry.taskStatusDestroyed);
- addTaskStateMetric(State.DESTROYED, registry.taskStatusDestroyed);
+ metricGroup.addValueMetric(registry.taskStatus, new LiteralSupplier<String>() {
+ @Override
+ public String metricValue(long now) {
+ return taskStateTimer.currentState().toString().toLowerCase(Locale.getDefault());
+ }
+ });
addRatioMetric(State.RUNNING, registry.taskRunningRatio);
addRatioMetric(State.PAUSED, registry.taskPauseRatio);
@@ -337,15 +339,6 @@ abstract class WorkerTask implements Runnable {
commitAttempts.add(commitFrequencies);
}
- private void addTaskStateMetric(final State matchingState, MetricNameTemplate template) {
- metricGroup.addIndicatorMetric(template, new IndicatorPredicate() {
- @Override
- public boolean matches() {
- return matchingState == taskStateTimer.currentState();
- }
- });
- }
-
private void addRatioMetric(final State matchingState, MetricNameTemplate template) {
MetricName metricName = metricGroup.metricName(template);
if (metricGroup.metrics().metric(metricName) == null) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/a47bfbca/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 390f8c3..4d3d07b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -19,6 +19,10 @@ package org.apache.kafka.connect.runtime.distributed;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@@ -28,6 +32,10 @@ import org.apache.kafka.connect.errors.AlreadyExistsException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.AbstractHerder;
+import org.apache.kafka.connect.runtime.ConnectMetrics;
+import org.apache.kafka.connect.runtime.ConnectMetrics.LiteralSupplier;
+import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.ConnectMetricsRegistry;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.HerderConnectorContext;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
@@ -106,6 +114,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
private final AtomicLong requestSeqNum = new AtomicLong();
private final Time time;
+ private final HerderMetrics herderMetrics;
private final String workerGroupId;
private final int workerSyncTimeoutMs;
@@ -143,7 +152,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
StatusBackingStore statusBackingStore,
ConfigBackingStore configBackingStore,
String restUrl) {
- this(config, worker, worker.workerId(), statusBackingStore, configBackingStore, null, restUrl, time);
+ this(config, worker, worker.workerId(), statusBackingStore, configBackingStore, null, restUrl, worker.metrics(), time);
configBackingStore.setUpdateListener(new ConfigUpdateListener());
}
@@ -155,10 +164,12 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
ConfigBackingStore configBackingStore,
WorkerGroupMember member,
String restUrl,
+ ConnectMetrics metrics,
Time time) {
super(worker, workerId, statusBackingStore, configBackingStore);
this.time = time;
+ this.herderMetrics = new HerderMetrics(metrics);
this.workerGroupId = config.getString(DistributedConfig.GROUP_ID_CONFIG);
this.workerSyncTimeoutMs = config.getInt(DistributedConfig.WORKER_SYNC_TIMEOUT_MS_CONFIG);
this.workerTasksShutdownTimeoutMs = config.getLong(DistributedConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG);
@@ -202,6 +213,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
halt();
log.info("Herder stopped");
+ herderMetrics.close();
} catch (Throwable t) {
log.error("Uncaught exception in herder work thread, exiting: ", t);
Exit.exit(1);
@@ -781,6 +793,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
// We only mark this as resolved once we've actually started work, which allows us to correctly track whether
// what work is currently active and running. If we bail early, the main tick loop + having requested rejoin
// guarantees we'll attempt to rejoin before executing this method again.
+ herderMetrics.rebalanceSucceeded(time.milliseconds());
rebalanceResolved = true;
return true;
}
@@ -1163,6 +1176,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
}
}
+ protected HerderMetrics herderMetrics() {
+ return herderMetrics;
+ }
+
// Rebalances are triggered internally from the group member, so these are always executed in the work thread.
public class RebalanceListener implements WorkerRebalanceListener {
@Override
@@ -1177,6 +1194,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
DistributedHerder.this.assignment = assignment;
DistributedHerder.this.generation = generation;
rebalanceResolved = false;
+ herderMetrics.rebalanceStarted(time.milliseconds());
}
// Delete the statuses of all connectors removed prior to the start of this rebalance. This has to
@@ -1230,4 +1248,71 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
}
}
+ class HerderMetrics {
+ private final MetricGroup metricGroup;
+ private final Sensor rebalanceCompletedCounts;
+ private final Sensor rebalanceTime;
+ private volatile long lastRebalanceCompletedAtMillis = Long.MIN_VALUE;
+ private volatile boolean rebalancing = false;
+ private volatile long rebalanceStartedAtMillis = 0L;
+
+ public HerderMetrics(ConnectMetrics connectMetrics) {
+ ConnectMetricsRegistry registry = connectMetrics.registry();
+ metricGroup = connectMetrics.group(registry.workerRebalanceGroupName());
+
+ metricGroup.addValueMetric(registry.leaderName, new LiteralSupplier<String>() {
+ @Override
+ public String metricValue(long now) {
+ return leaderUrl();
+ }
+ });
+ metricGroup.addValueMetric(registry.epoch, new LiteralSupplier<Double>() {
+ @Override
+ public Double metricValue(long now) {
+ return (double) generation;
+ }
+ });
+ metricGroup.addValueMetric(registry.rebalanceMode, new LiteralSupplier<Double>() {
+ @Override
+ public Double metricValue(long now) {
+ return rebalancing ? 1.0d : 0.0d;
+ }
+ });
+
+ rebalanceCompletedCounts = metricGroup.sensor("completed-rebalance-count");
+ rebalanceCompletedCounts.add(metricGroup.metricName(registry.rebalanceCompletedTotal), new Total());
+
+ rebalanceTime = metricGroup.sensor("rebalance-time");
+ rebalanceTime.add(metricGroup.metricName(registry.rebalanceTimeMax), new Max());
+ rebalanceTime.add(metricGroup.metricName(registry.rebalanceTimeAvg), new Avg());
+
+ metricGroup.addValueMetric(registry.rebalanceTimeSinceLast, new LiteralSupplier<Double>() {
+ @Override
+ public Double metricValue(long now) {
+ return lastRebalanceCompletedAtMillis == Long.MIN_VALUE ? Double.POSITIVE_INFINITY : (double) (now - lastRebalanceCompletedAtMillis);
+ }
+ });
+ }
+
+ void close() {
+ metricGroup.close();
+ }
+
+ void rebalanceStarted(long now) {
+ rebalanceStartedAtMillis = now;
+ rebalancing = true;
+ }
+
+ void rebalanceSucceeded(long now) {
+ long duration = Math.max(0L, now - rebalanceStartedAtMillis);
+ rebalancing = false;
+ rebalanceCompletedCounts.record(1.0);
+ rebalanceTime.record(duration);
+ lastRebalanceCompletedAtMillis = now;
+ }
+
+ protected MetricGroup metricGroup() {
+ return metricGroup;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a47bfbca/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
index 6cc6db7..f1df140 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
@@ -56,6 +56,10 @@ public class MockConnectMetrics extends ConnectMetrics {
this(new MockTime());
}
+ public MockConnectMetrics(org.apache.kafka.common.utils.MockTime time) {
+ super("mock", new WorkerConfig(WorkerConfig.baseConfigDef(), DEFAULT_WORKER_CONFIG), time);
+ }
+
public MockConnectMetrics(MockTime time) {
super("mock", new WorkerConfig(WorkerConfig.baseConfigDef(), DEFAULT_WORKER_CONFIG), time);
}
@@ -73,28 +77,81 @@ public class MockConnectMetrics extends ConnectMetrics {
* @param name the name of the metric
* @return the current value of the metric
*/
- public double currentMetricValue(MetricGroup metricGroup, String name) {
+ public Object currentMetricValue(MetricGroup metricGroup, String name) {
+ return currentMetricValue(this, metricGroup, name);
+ }
+
+ /**
+ * Get the current value of the named metric, which may have already been removed from the
+ * {@link org.apache.kafka.common.metrics.Metrics} but will have been captured before it was removed.
+ *
+ * @param metricGroup the metric metricGroup that contained the metric
+ * @param name the name of the metric
+ * @return the current value of the metric
+ */
+ public double currentMetricValueAsDouble(MetricGroup metricGroup, String name) {
+ Object value = currentMetricValue(metricGroup, name);
+ return value instanceof Double ? ((Double) value).doubleValue() : Double.NaN;
+ }
+
+ /**
+ * Get the current value of the named metric, which may have already been removed from the
+ * {@link org.apache.kafka.common.metrics.Metrics} but will have been captured before it was removed.
+ *
+ * @param metricGroup the metric metricGroup that contained the metric
+ * @param name the name of the metric
+ * @return the current value of the metric
+ */
+ public String currentMetricValueAsString(MetricGroup metricGroup, String name) {
+ Object value = currentMetricValue(metricGroup, name);
+ return value instanceof String ? (String) value : null;
+ }
+
+ /**
+ * Get the current value of the named metric, which may have already been removed from the
+ * {@link org.apache.kafka.common.metrics.Metrics} but will have been captured before it was removed.
+ *
+ * @param metrics the {@link ConnectMetrics} instance
+ * @param metricGroup the metric metricGroup that contained the metric
+ * @param name the name of the metric
+ * @return the current value of the metric
+ */
+ public static Object currentMetricValue(ConnectMetrics metrics, MetricGroup metricGroup, String name) {
MetricName metricName = metricGroup.metricName(name);
- for (MetricsReporter reporter : metrics().reporters()) {
+ for (MetricsReporter reporter : metrics.metrics().reporters()) {
if (reporter instanceof MockMetricsReporter) {
return ((MockMetricsReporter) reporter).currentMetricValue(metricName);
}
}
- return Double.NEGATIVE_INFINITY;
+ return null;
}
/**
- * Determine if the {@link KafkaMetric} with the specified name exists within the
- * {@link org.apache.kafka.common.metrics.Metrics} instance.
+ * Get the current value of the named metric, which may have already been removed from the
+ * {@link org.apache.kafka.common.metrics.Metrics} but will have been captured before it was removed.
*
+ * @param metrics the {@link ConnectMetrics} instance
* @param metricGroup the metric metricGroup that contained the metric
* @param name the name of the metric
- * @return true if the metric is still register, or false if it has been removed
+ * @return the current value of the metric
*/
- public boolean metricExists(MetricGroup metricGroup, String name) {
- MetricName metricName = metricGroup.metricName(name);
- KafkaMetric metric = metricGroup.metrics().metric(metricName);
- return metric != null;
+ public static double currentMetricValueAsDouble(ConnectMetrics metrics, MetricGroup metricGroup, String name) {
+ Object value = currentMetricValue(metrics, metricGroup, name);
+ return value instanceof Double ? ((Double) value).doubleValue() : Double.NaN;
+ }
+
+ /**
+ * Get the current value of the named metric, which may have already been removed from the
+ * {@link org.apache.kafka.common.metrics.Metrics} but will have been captured before it was removed.
+ *
+ * @param metrics the {@link ConnectMetrics} instance
+ * @param metricGroup the metric metricGroup that contained the metric
+ * @param name the name of the metric
+ * @return the current value of the metric
+ */
+ public static String currentMetricValueAsString(ConnectMetrics metrics, MetricGroup metricGroup, String name) {
+ Object value = currentMetricValue(metrics, metricGroup, name);
+ return value instanceof String ? (String) value : null;
}
public static class MockMetricsReporter implements MetricsReporter {
@@ -136,10 +193,9 @@ public class MockConnectMetrics extends ConnectMetrics {
* @param metricName the name of the metric that was registered most recently
* @return the current value of the metric
*/
- @SuppressWarnings("deprecation")
- public double currentMetricValue(MetricName metricName) {
+ public Object currentMetricValue(MetricName metricName) {
KafkaMetric metric = metricsByName.get(metricName);
- return metric != null ? metric.value() : Double.NaN;
+ return metric != null ? metric.metricValue() : null;
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a47bfbca/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
index 5f03f5a..10c413d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
+import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
@@ -32,12 +33,15 @@ import java.util.HashMap;
import java.util.Map;
import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@RunWith(EasyMockRunner.class)
public class WorkerConnectorTest extends EasyMockSupport {
+ private static final String VERSION = "1.1";
public static final String CONNECTOR = "connector";
public static final Map<String, String> CONFIG = new HashMap<>();
static {
@@ -45,7 +49,7 @@ public class WorkerConnectorTest extends EasyMockSupport {
CONFIG.put(ConnectorConfig.NAME_CONFIG, CONNECTOR);
}
public ConnectorConfig connectorConfig;
- public ConnectMetrics metrics;
+ public MockConnectMetrics metrics;
@Mock Plugins plugins;
@Mock Connector connector;
@@ -67,6 +71,9 @@ public class WorkerConnectorTest extends EasyMockSupport {
public void testInitializeFailure() {
RuntimeException exception = new RuntimeException();
+ connector.version();
+ expectLastCall().andReturn(VERSION);
+
connector.initialize(EasyMock.notNull(ConnectorContext.class));
expectLastCall().andThrow(exception);
@@ -92,6 +99,9 @@ public class WorkerConnectorTest extends EasyMockSupport {
public void testFailureIsFinalState() {
RuntimeException exception = new RuntimeException();
+ connector.version();
+ expectLastCall().andReturn(VERSION);
+
connector.initialize(EasyMock.notNull(ConnectorContext.class));
expectLastCall().andThrow(exception);
@@ -119,6 +129,9 @@ public class WorkerConnectorTest extends EasyMockSupport {
@Test
public void testStartupAndShutdown() {
+ connector.version();
+ expectLastCall().andReturn(VERSION);
+
connector.initialize(EasyMock.notNull(ConnectorContext.class));
expectLastCall();
@@ -150,6 +163,9 @@ public class WorkerConnectorTest extends EasyMockSupport {
@Test
public void testStartupAndPause() {
+ connector.version();
+ expectLastCall().andReturn(VERSION);
+
connector.initialize(EasyMock.notNull(ConnectorContext.class));
expectLastCall();
@@ -186,6 +202,9 @@ public class WorkerConnectorTest extends EasyMockSupport {
@Test
public void testOnResume() {
+ connector.version();
+ expectLastCall().andReturn(VERSION);
+
connector.initialize(EasyMock.notNull(ConnectorContext.class));
expectLastCall();
@@ -222,6 +241,9 @@ public class WorkerConnectorTest extends EasyMockSupport {
@Test
public void testStartupPaused() {
+ connector.version();
+ expectLastCall().andReturn(VERSION);
+
connector.initialize(EasyMock.notNull(ConnectorContext.class));
expectLastCall();
@@ -251,6 +273,9 @@ public class WorkerConnectorTest extends EasyMockSupport {
public void testStartupFailure() {
RuntimeException exception = new RuntimeException();
+ connector.version();
+ expectLastCall().andReturn(VERSION);
+
connector.initialize(EasyMock.notNull(ConnectorContext.class));
expectLastCall();
@@ -281,6 +306,9 @@ public class WorkerConnectorTest extends EasyMockSupport {
public void testShutdownFailure() {
RuntimeException exception = new RuntimeException();
+ connector.version();
+ expectLastCall().andReturn(VERSION);
+
connector.initialize(EasyMock.notNull(ConnectorContext.class));
expectLastCall();
@@ -312,6 +340,9 @@ public class WorkerConnectorTest extends EasyMockSupport {
@Test
public void testTransitionStartedToStarted() {
+ connector.version();
+ expectLastCall().andReturn(VERSION);
+
connector.initialize(EasyMock.notNull(ConnectorContext.class));
expectLastCall();
@@ -346,6 +377,9 @@ public class WorkerConnectorTest extends EasyMockSupport {
@Test
public void testTransitionPausedToPaused() {
+ connector.version();
+ expectLastCall().andReturn(VERSION);
+
connector.initialize(EasyMock.notNull(ConnectorContext.class));
expectLastCall();
@@ -415,6 +449,14 @@ public class WorkerConnectorTest extends EasyMockSupport {
assertFalse(workerConnector.metrics().isFailed());
assertFalse(workerConnector.metrics().isPaused());
assertFalse(workerConnector.metrics().isRunning());
+ MetricGroup metricGroup = workerConnector.metrics().metricGroup();
+ String status = metrics.currentMetricValueAsString(metricGroup, "status");
+ String type = metrics.currentMetricValueAsString(metricGroup, "connector-type");
+ String clazz = metrics.currentMetricValueAsString(metricGroup, "connector-class");
+ String version = metrics.currentMetricValueAsString(metricGroup, "connector-version");
+ assertEquals(type, "unknown");
+ assertNotNull(clazz);
+ assertEquals(VERSION, version);
}
private static abstract class TestConnector extends Connector {
http://git-wip-us.apache.org/repos/asf/kafka/blob/a47bfbca/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 782d66b..50b091d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -180,8 +180,7 @@ public class WorkerSinkTaskTest {
time.sleep(10000L);
assertSinkMetricValue("partition-count", 2);
- assertTaskMetricValue("status-running", 0.0);
- assertTaskMetricValue("status-paused", 1.0);
+ assertTaskMetricValue("status", "paused");
assertTaskMetricValue("running-ratio", 0.0);
assertTaskMetricValue("pause-ratio", 1.0);
assertTaskMetricValue("offset-commit-max-time-ms", Double.NEGATIVE_INFINITY);
@@ -253,8 +252,7 @@ public class WorkerSinkTaskTest {
assertSinkMetricValue("offset-commit-completion-total", 0.0);
assertSinkMetricValue("offset-commit-skip-rate", 0.0);
assertSinkMetricValue("offset-commit-skip-total", 0.0);
- assertTaskMetricValue("status-running", 1.0);
- assertTaskMetricValue("status-paused", 0.0);
+ assertTaskMetricValue("status", "running");
assertTaskMetricValue("running-ratio", 1.0);
assertTaskMetricValue("pause-ratio", 0.0);
assertTaskMetricValue("batch-size-max", 1.0);
@@ -272,8 +270,7 @@ public class WorkerSinkTaskTest {
assertSinkMetricValue("offset-commit-completion-total", 1.0);
assertSinkMetricValue("offset-commit-skip-rate", 0.0);
assertSinkMetricValue("offset-commit-skip-total", 0.0);
- assertTaskMetricValue("status-running", 0.0);
- assertTaskMetricValue("status-paused", 1.0);
+ assertTaskMetricValue("status", "paused");
assertTaskMetricValue("running-ratio", 0.25);
assertTaskMetricValue("pause-ratio", 0.75);
@@ -333,8 +330,7 @@ public class WorkerSinkTaskTest {
assertSinkMetricValue("offset-commit-completion-total", 0.0);
assertSinkMetricValue("offset-commit-skip-rate", 0.0);
assertSinkMetricValue("offset-commit-skip-total", 0.0);
- assertTaskMetricValue("status-running", 1.0);
- assertTaskMetricValue("status-paused", 0.0);
+ assertTaskMetricValue("status", "running");
assertTaskMetricValue("running-ratio", 1.0);
assertTaskMetricValue("pause-ratio", 0.0);
assertTaskMetricValue("batch-size-max", 0.0);
@@ -352,8 +348,7 @@ public class WorkerSinkTaskTest {
assertSinkMetricValue("sink-record-active-count", 1.0);
assertSinkMetricValue("sink-record-active-count-max", 1.0);
assertSinkMetricValue("sink-record-active-count-avg", 0.5);
- assertTaskMetricValue("status-running", 1.0);
- assertTaskMetricValue("status-paused", 0.0);
+ assertTaskMetricValue("status", "running");
assertTaskMetricValue("running-ratio", 1.0);
assertTaskMetricValue("batch-size-max", 1.0);
assertTaskMetricValue("batch-size-avg", 0.5);
@@ -492,8 +487,7 @@ public class WorkerSinkTaskTest {
assertSinkMetricValue("offset-commit-seq-no", 1.0);
assertSinkMetricValue("offset-commit-completion-total", 1.0);
assertSinkMetricValue("offset-commit-skip-total", 0.0);
- assertTaskMetricValue("status-running", 1.0);
- assertTaskMetricValue("status-paused", 0.0);
+ assertTaskMetricValue("status", "running");
assertTaskMetricValue("running-ratio", 1.0);
assertTaskMetricValue("pause-ratio", 0.0);
assertTaskMetricValue("batch-size-max", 1.0);
@@ -560,8 +554,7 @@ public class WorkerSinkTaskTest {
assertSinkMetricValue("offset-commit-seq-no", 0.0);
assertSinkMetricValue("offset-commit-completion-total", 0.0);
assertSinkMetricValue("offset-commit-skip-total", 0.0);
- assertTaskMetricValue("status-running", 1.0);
- assertTaskMetricValue("status-paused", 0.0);
+ assertTaskMetricValue("status", "running");
assertTaskMetricValue("running-ratio", 1.0);
assertTaskMetricValue("pause-ratio", 0.0);
assertTaskMetricValue("batch-size-max", 1.0);
@@ -588,8 +581,7 @@ public class WorkerSinkTaskTest {
assertSinkMetricValue("offset-commit-seq-no", 1.0);
assertSinkMetricValue("offset-commit-completion-total", 1.0);
assertSinkMetricValue("offset-commit-skip-total", 0.0);
- assertTaskMetricValue("status-running", 1.0);
- assertTaskMetricValue("status-paused", 0.0);
+ assertTaskMetricValue("status", "running");
assertTaskMetricValue("running-ratio", 1.0);
assertTaskMetricValue("pause-ratio", 0.0);
assertTaskMetricValue("batch-size-max", 1.0);
@@ -991,8 +983,7 @@ public class WorkerSinkTaskTest {
assertSinkMetricValue("offset-commit-seq-no", 2.0);
assertSinkMetricValue("offset-commit-completion-total", 1.0);
assertSinkMetricValue("offset-commit-skip-total", 1.0);
- assertTaskMetricValue("status-running", 1.0);
- assertTaskMetricValue("status-paused", 0.0);
+ assertTaskMetricValue("status", "running");
assertTaskMetricValue("running-ratio", 1.0);
assertTaskMetricValue("pause-ratio", 0.0);
assertTaskMetricValue("batch-size-max", 2.0);
@@ -1026,8 +1017,7 @@ public class WorkerSinkTaskTest {
assertSinkMetricValue("offset-commit-seq-no", 3.0);
assertSinkMetricValue("offset-commit-completion-total", 2.0);
assertSinkMetricValue("offset-commit-skip-total", 1.0);
- assertTaskMetricValue("status-running", 1.0);
- assertTaskMetricValue("status-paused", 0.0);
+ assertTaskMetricValue("status", "running");
assertTaskMetricValue("running-ratio", 1.0);
assertTaskMetricValue("pause-ratio", 0.0);
assertTaskMetricValue("batch-size-max", 2.0);
@@ -1089,7 +1079,7 @@ public class WorkerSinkTaskTest {
assertFalse(sinkTaskContext.getValue().isCommitRequested()); // should have been cleared
assertEquals(offsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
assertEquals(0, workerTask.commitFailures());
- assertEquals(1.0, metrics.currentMetricValue(workerTask.taskMetricsGroup().metricGroup(), "batch-size-max"), 0.0001);
+ assertEquals(1.0, metrics.currentMetricValueAsDouble(workerTask.taskMetricsGroup().metricGroup(), "batch-size-max"), 0.0001);
PowerMock.verifyAll();
}
@@ -1289,16 +1279,22 @@ public class WorkerSinkTaskTest {
private void assertSinkMetricValue(String name, double expected) {
MetricGroup sinkTaskGroup = workerTask.sinkTaskMetricsGroup().metricGroup();
- double measured = metrics.currentMetricValue(sinkTaskGroup, name);
+ double measured = metrics.currentMetricValueAsDouble(sinkTaskGroup, name);
assertEquals(expected, measured, 0.001d);
}
private void assertTaskMetricValue(String name, double expected) {
MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup();
- double measured = metrics.currentMetricValue(taskGroup, name);
+ double measured = metrics.currentMetricValueAsDouble(taskGroup, name);
assertEquals(expected, measured, 0.001d);
}
+ private void assertTaskMetricValue(String name, String expected) {
+ MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup();
+ String measured = metrics.currentMetricValueAsString(taskGroup, name);
+ assertEquals(expected, measured);
+ }
+
private void printMetrics() {
System.out.println("");
sinkMetricValue("sink-record-read-rate");
@@ -1334,14 +1330,14 @@ public class WorkerSinkTaskTest {
private double sinkMetricValue(String metricName) {
MetricGroup sinkTaskGroup = workerTask.sinkTaskMetricsGroup().metricGroup();
- double value = metrics.currentMetricValue(sinkTaskGroup, metricName);
+ double value = metrics.currentMetricValueAsDouble(sinkTaskGroup, metricName);
System.out.println("** " + metricName + "=" + value);
return value;
}
private double taskMetricValue(String metricName) {
MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup();
- double value = metrics.currentMetricValue(taskGroup, metricName);
+ double value = metrics.currentMetricValueAsDouble(taskGroup, metricName);
System.out.println("** " + metricName + "=" + value);
return value;
}
@@ -1350,10 +1346,10 @@ public class WorkerSinkTaskTest {
private void assertMetrics(int minimumPollCountExpected) {
MetricGroup sinkTaskGroup = workerTask.sinkTaskMetricsGroup().metricGroup();
MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup();
- double readRate = metrics.currentMetricValue(sinkTaskGroup, "sink-record-read-rate");
- double readTotal = metrics.currentMetricValue(sinkTaskGroup, "sink-record-read-total");
- double sendRate = metrics.currentMetricValue(sinkTaskGroup, "sink-record-send-rate");
- double sendTotal = metrics.currentMetricValue(sinkTaskGroup, "sink-record-send-total");
+ double readRate = metrics.currentMetricValueAsDouble(sinkTaskGroup, "sink-record-read-rate");
+ double readTotal = metrics.currentMetricValueAsDouble(sinkTaskGroup, "sink-record-read-total");
+ double sendRate = metrics.currentMetricValueAsDouble(sinkTaskGroup, "sink-record-send-rate");
+ double sendTotal = metrics.currentMetricValueAsDouble(sinkTaskGroup, "sink-record-send-total");
}
private abstract static class TestSinkTask extends SinkTask {
http://git-wip-us.apache.org/repos/asf/kafka/blob/a47bfbca/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 928ccb9..4f0d243 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -615,12 +615,12 @@ public class WorkerSourceTaskTest extends ThreadedTest {
group.recordPoll(100, 1000 + i * 100);
group.recordWrite(10);
}
- assertEquals(1900.0, metrics.currentMetricValue(group.metricGroup(), "poll-batch-max-time-ms"), 0.001d);
- assertEquals(1450.0, metrics.currentMetricValue(group.metricGroup(), "poll-batch-avg-time-ms"), 0.001d);
- assertEquals(33.333, metrics.currentMetricValue(group.metricGroup(), "source-record-poll-rate"), 0.001d);
- assertEquals(1000, metrics.currentMetricValue(group.metricGroup(), "source-record-poll-total"), 0.001d);
- assertEquals(3.3333, metrics.currentMetricValue(group.metricGroup(), "source-record-write-rate"), 0.001d);
- assertEquals(100, metrics.currentMetricValue(group.metricGroup(), "source-record-write-total"), 0.001d);
+ assertEquals(1900.0, metrics.currentMetricValueAsDouble(group.metricGroup(), "poll-batch-max-time-ms"), 0.001d);
+ assertEquals(1450.0, metrics.currentMetricValueAsDouble(group.metricGroup(), "poll-batch-avg-time-ms"), 0.001d);
+ assertEquals(33.333, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-poll-rate"), 0.001d);
+ assertEquals(1000, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-poll-total"), 0.001d);
+ assertEquals(3.3333, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-write-rate"), 0.001d);
+ assertEquals(100, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-write-total"), 0.001d);
}
private CountDownLatch expectPolls(int minimum, final AtomicInteger count) throws InterruptedException {
@@ -795,19 +795,19 @@ public class WorkerSourceTaskTest extends ThreadedTest {
private void assertPollMetrics(int minimumPollCountExpected) {
MetricGroup sourceTaskGroup = workerTask.sourceTaskMetricsGroup().metricGroup();
MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup();
- double pollRate = metrics.currentMetricValue(sourceTaskGroup, "source-record-poll-rate");
- double pollTotal = metrics.currentMetricValue(sourceTaskGroup, "source-record-poll-total");
+ double pollRate = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-poll-rate");
+ double pollTotal = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-poll-total");
if (minimumPollCountExpected > 0) {
- assertEquals(RECORDS.size(), metrics.currentMetricValue(taskGroup, "batch-size-max"), 0.000001d);
- assertEquals(RECORDS.size(), metrics.currentMetricValue(taskGroup, "batch-size-avg"), 0.000001d);
+ assertEquals(RECORDS.size(), metrics.currentMetricValueAsDouble(taskGroup, "batch-size-max"), 0.000001d);
+ assertEquals(RECORDS.size(), metrics.currentMetricValueAsDouble(taskGroup, "batch-size-avg"), 0.000001d);
assertTrue(pollRate > 0.0d);
} else {
assertTrue(pollRate == 0.0d);
}
assertTrue(pollTotal >= minimumPollCountExpected);
- double writeRate = metrics.currentMetricValue(sourceTaskGroup, "source-record-write-rate");
- double writeTotal = metrics.currentMetricValue(sourceTaskGroup, "source-record-write-total");
+ double writeRate = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-write-rate");
+ double writeTotal = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-write-total");
if (minimumPollCountExpected > 0) {
assertTrue(writeRate > 0.0d);
} else {
@@ -815,14 +815,14 @@ public class WorkerSourceTaskTest extends ThreadedTest {
}
assertTrue(writeTotal >= minimumPollCountExpected);
- double pollBatchTimeMax = metrics.currentMetricValue(sourceTaskGroup, "poll-batch-max-time-ms");
- double pollBatchTimeAvg = metrics.currentMetricValue(sourceTaskGroup, "poll-batch-avg-time-ms");
+ double pollBatchTimeMax = metrics.currentMetricValueAsDouble(sourceTaskGroup, "poll-batch-max-time-ms");
+ double pollBatchTimeAvg = metrics.currentMetricValueAsDouble(sourceTaskGroup, "poll-batch-avg-time-ms");
if (minimumPollCountExpected > 0) {
assertTrue(pollBatchTimeMax >= 0.0d);
}
assertTrue(pollBatchTimeAvg >= 0.0d);
- double activeCount = metrics.currentMetricValue(sourceTaskGroup, "source-record-active-count");
- double activeCountMax = metrics.currentMetricValue(sourceTaskGroup, "source-record-active-count-max");
+ double activeCount = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-active-count");
+ double activeCountMax = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-active-count-max");
assertEquals(0, activeCount, 0.000001d);
if (minimumPollCountExpected > 0) {
assertEquals(RECORDS.size(), activeCountMax, 0.000001d);
http://git-wip-us.apache.org/repos/asf/kafka/blob/a47bfbca/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
index 96746a5..516b71a 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
@@ -314,8 +314,8 @@ public class WorkerTaskTest {
long totalTime = 27000L;
double pauseTimeRatio = (double) (3000L + 5000L) / totalTime;
double runningTimeRatio = (double) (2000L + 4000L + 6000L) / totalTime;
- assertEquals(pauseTimeRatio, metrics.currentMetricValue(group.metricGroup(), "pause-ratio"), 0.000001d);
- assertEquals(runningTimeRatio, metrics.currentMetricValue(group.metricGroup(), "running-ratio"), 0.000001d);
+ assertEquals(pauseTimeRatio, metrics.currentMetricValueAsDouble(group.metricGroup(), "pause-ratio"), 0.000001d);
+ assertEquals(runningTimeRatio, metrics.currentMetricValueAsDouble(group.metricGroup(), "running-ratio"), 0.000001d);
}
private static abstract class TestSinkTask extends SinkTask {