You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2017/10/03 18:52:17 UTC
kafka git commit: KAFKA-5902: Added sink task metrics (KIP-196)
Repository: kafka
Updated Branches:
refs/heads/trunk 5663f51ed -> 05357b703
KAFKA-5902: Added sink task metrics (KIP-196)
Added Connect metrics specific to source tasks, and builds upon #3864 and #3911 that have already been merged into `trunk`, and #3959 that has yet to be merged.
I'll rebase this PR when the latter is merged.
Author: Randall Hauch <rh...@gmail.com>
Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>
Closes #3975 from rhauch/kafka-5902
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/05357b70
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/05357b70
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/05357b70
Branch: refs/heads/trunk
Commit: 05357b7030c784a6548453f533d3c00e19548ba2
Parents: 5663f51
Author: Randall Hauch <rh...@gmail.com>
Authored: Tue Oct 3 11:52:14 2017 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Oct 3 11:52:14 2017 -0700
----------------------------------------------------------------------
.../kafka/connect/runtime/StateTracker.java | 2 +-
.../kafka/connect/runtime/WorkerSinkTask.java | 209 +++++++++++++++
.../kafka/connect/runtime/WorkerSourceTask.java | 81 ++++--
.../connect/runtime/MockConnectMetrics.java | 9 +-
.../connect/runtime/WorkerSinkTaskTest.java | 267 ++++++++++++++++++-
.../connect/runtime/WorkerSourceTaskTest.java | 6 +
6 files changed, 540 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/05357b70/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java
index 566a6fc..f2f0986 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java
@@ -167,7 +167,7 @@ public class StateTracker {
}
long total = durationCurrent + unassignedTotalTimeMs + runningTotalTimeMs + pausedTotalTimeMs +
failedTotalTimeMs + destroyedTotalTimeMs;
- return (double) durationDesired / total;
+ return total == 0.0d ? 0.0d : (double) durationDesired / total;
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/05357b70/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 8f0f7fd..3cb68b7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -26,11 +26,18 @@ import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.Total;
+import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.storage.Converter;
@@ -62,6 +69,7 @@ class WorkerSinkTask extends WorkerTask {
private final Converter keyConverter;
private final Converter valueConverter;
private final TransformationChain<SinkRecord> transformationChain;
+ private final SinkTaskMetricsGroup sinkTaskMetricsGroup;
private KafkaConsumer<byte[], byte[]> consumer;
private WorkerSinkTaskContext context;
private final List<SinkRecord> messageBatch;
@@ -106,6 +114,8 @@ class WorkerSinkTask extends WorkerTask {
this.commitSeqno = 0;
this.commitStarted = -1;
this.commitFailures = 0;
+ this.sinkTaskMetricsGroup = new SinkTaskMetricsGroup(id, connectMetrics);
+ this.sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno);
}
@Override
@@ -139,6 +149,7 @@ class WorkerSinkTask extends WorkerTask {
@Override
protected void releaseResources() {
+ sinkTaskMetricsGroup.close();
}
@Override
@@ -215,6 +226,7 @@ class WorkerSinkTask extends WorkerTask {
if (commitSeqno != seqno) {
log.debug("{} Received out of order commit callback for sequence number {}, but most recent sequence number is {}",
this, seqno, commitSeqno);
+ sinkTaskMetricsGroup.recordOffsetCommitSkip();
} else {
long durationMillis = time.milliseconds() - commitStarted;
if (error != null) {
@@ -228,6 +240,7 @@ class WorkerSinkTask extends WorkerTask {
if (committedOffsets != null) {
log.debug("{} Setting last committed offsets to {}", this, committedOffsets);
lastCommittedOffsets = committedOffsets;
+ sinkTaskMetricsGroup.recordCommittedOffsets(committedOffsets);
}
commitFailures = 0;
recordCommitSuccess(durationMillis);
@@ -322,6 +335,7 @@ class WorkerSinkTask extends WorkerTask {
committing = true;
commitSeqno += 1;
commitStarted = now;
+ sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno);
final Map<TopicPartition, OffsetAndMetadata> taskProvidedOffsets;
try {
@@ -400,6 +414,7 @@ class WorkerSinkTask extends WorkerTask {
throw e;
}
+ sinkTaskMetricsGroup.recordRead(msgs.count());
return msgs;
}
@@ -456,6 +471,7 @@ class WorkerSinkTask extends WorkerTask {
this, msg.topic(), msg.partition(), msg.offset(), timestamp, keyAndSchema.value(), valueAndSchema.value());
}
}
+ sinkTaskMetricsGroup.recordConsumedOffsets(origOffsets);
}
private void resumeAll() {
@@ -473,8 +489,10 @@ class WorkerSinkTask extends WorkerTask {
try {
// Since we reuse the messageBatch buffer, ensure we give the task its own copy
log.trace("{} Delivering batch of {} messages to task", this, messageBatch.size());
+ long start = time.milliseconds();
task.put(new ArrayList<>(messageBatch));
recordBatch(messageBatch.size());
+ sinkTaskMetricsGroup.recordPut(time.milliseconds() - start);
currentOffsets.putAll(origOffsets);
messageBatch.clear();
// If we had paused all consumer topic partitions to try to redeliver data, then we should resume any that
@@ -519,11 +537,34 @@ class WorkerSinkTask extends WorkerTask {
}
private void openPartitions(Collection<TopicPartition> partitions) {
+ sinkTaskMetricsGroup.recordPartitionCount(partitions.size());
task.open(partitions);
}
private void closePartitions() {
commitOffsets(time.milliseconds(), true);
+ sinkTaskMetricsGroup.recordPartitionCount(0);
+ }
+
+ @Override
+ protected void recordBatch(int size) {
+ super.recordBatch(size);
+ sinkTaskMetricsGroup.recordSend(size);
+ }
+
+ @Override
+ protected void recordCommitFailure(long duration, Throwable error) {
+ super.recordCommitFailure(duration, error);
+ }
+
+ @Override
+ protected void recordCommitSuccess(long duration) {
+ super.recordCommitSuccess(duration);
+ sinkTaskMetricsGroup.recordOffsetCommitSuccess();
+ }
+
+ SinkTaskMetricsGroup sinkTaskMetricsGroup() {
+ return sinkTaskMetricsGroup;
}
private class HandleRebalance implements ConsumerRebalanceListener {
@@ -538,6 +579,7 @@ class WorkerSinkTask extends WorkerTask {
currentOffsets.put(tp, new OffsetAndMetadata(pos));
log.debug("{} Assigned topic partition {} with offset {}", this, tp, pos);
}
+ sinkTaskMetricsGroup.assignedOffsets(currentOffsets);
// If we paused everything for redelivery (which is no longer relevant since we discarded the data), make
// sure anything we paused that the task didn't request to be paused *and* which we still own is resumed.
@@ -572,6 +614,7 @@ class WorkerSinkTask extends WorkerTask {
log.debug("{} Partitions revoked", WorkerSinkTask.this);
try {
closePartitions();
+ sinkTaskMetricsGroup.clearOffsets();
} catch (RuntimeException e) {
// The consumer swallows exceptions raised in the rebalance listener, so we need to store
// exceptions and rethrow when poll() returns.
@@ -582,4 +625,170 @@ class WorkerSinkTask extends WorkerTask {
messageBatch.clear();
}
}
+
+ static class SinkTaskMetricsGroup {
+ private final MetricGroup metricGroup;
+ private final Sensor sinkRecordRead;
+ private final Sensor sinkRecordSend;
+ private final Sensor partitionCount;
+ private final Sensor offsetSeqNum;
+ private final Sensor offsetCompletion;
+ private final Sensor offsetCompletionSkip;
+ private final Sensor putBatchTime;
+ private final Sensor sinkRecordActiveCount;
+ private long activeRecords;
+ private Map<TopicPartition, OffsetAndMetadata> consumedOffsets = new HashMap<>();
+ private Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
+
+ public SinkTaskMetricsGroup(ConnectorTaskId id, ConnectMetrics connectMetrics) {
+ metricGroup = connectMetrics.group("sink-task-metrics",
+ "connector", id.connector(), "task", Integer.toString(id.task()));
+
+ sinkRecordRead = metricGroup.metrics().sensor("sink-record-read");
+ sinkRecordRead.add(metricGroup.metricName("sink-record-read-rate",
+ "The average per-second number of records read from Kafka for this task belonging to the " +
+ "named sink connector in this worker. This is before transformations are applied."),
+ new Rate());
+ sinkRecordRead.add(metricGroup.metricName("sink-record-read-total",
+ "The total number of records produced/polled (before transformation) by this task belonging " +
+ "to the named sink connector in this worker, since the task was last restarted."),
+ new Total());
+
+ sinkRecordSend = metricGroup.metrics().sensor("sink-record-send");
+ sinkRecordSend.add(metricGroup.metricName("sink-record-send-rate",
+ "The average per-second number of records output from the transformations and sent/put to " +
+ "this task belonging to the named sink connector in this worker. This is after transformations " +
+ "are applied and excludes any records filtered out by the transformations."),
+ new Rate());
+ sinkRecordSend.add(metricGroup.metricName("sink-record-send-total",
+ "The total number of records output from the transformations and sent/put to this task " +
+ "belonging to the named sink connector in this worker, since the task was last restarted."),
+ new Total());
+
+ sinkRecordActiveCount = metricGroup.metrics().sensor("sink-record-active-count");
+ sinkRecordActiveCount.add(metricGroup.metricName("sink-record-active-count",
+ "The number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged" +
+ "by the sink task"),
+ new Value());
+ sinkRecordActiveCount.add(metricGroup.metricName("sink-record-active-count-max",
+ "The maximum number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged" +
+ "by the sink task"),
+ new Max());
+ sinkRecordActiveCount.add(metricGroup.metricName("sink-record-active-count-avg",
+ "The average number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged" +
+ "by the sink task"),
+ new Avg());
+
+ partitionCount = metricGroup.metrics().sensor("partition-count");
+ partitionCount.add(metricGroup.metricName("partition-count",
+ "The number of topic partitions assigned to this task belonging to the named sink connector in this worker."),
+ new Value());
+
+ offsetSeqNum = metricGroup.metrics().sensor("offset-seq-number");
+ offsetSeqNum.add(metricGroup.metricName("offset-commit-seq-no",
+ "The current sequence number for offset commits"),
+ new Value());
+
+ offsetCompletion = metricGroup.metrics().sensor("offset-commit-completion");
+ offsetCompletion.add(metricGroup.metricName("offset-commit-completion-rate",
+ "The average per-second number of offset commit completions that were completed successfully."),
+ new Rate());
+ offsetCompletion.add(metricGroup.metricName("offset-commit-completion-total",
+ "The total number of offset commit completions that were completed successfully."),
+ new Total());
+
+ offsetCompletionSkip = metricGroup.metrics().sensor("offset-commit-completion-skip");
+ offsetCompletionSkip.add(metricGroup.metricName("offset-commit-completion-skip-rate",
+ "The average per-second number of offset commit completions that were received too late and skipped/ignored."),
+ new Rate());
+ offsetCompletionSkip.add(metricGroup.metricName("offset-commit-completion-skip-total",
+ "The total number of offset commit completions that were received too late and skipped/ignored."),
+ new Total());
+
+ putBatchTime = metricGroup.metrics().sensor("put-batch-time");
+ putBatchTime.add(metricGroup.metricName("put-batch-max-time-ms",
+ "The maximum time taken by this task to put a batch of sinks records."),
+ new Max());
+ putBatchTime.add(metricGroup.metricName("put-batch-avg-time-ms",
+ "The average time taken by this task to put a batch of sinks records."),
+ new Avg());
+ }
+
+ void computeSinkRecordLag() {
+ Map<TopicPartition, OffsetAndMetadata> consumed = this.consumedOffsets;
+ Map<TopicPartition, OffsetAndMetadata> committed = this.committedOffsets;
+ activeRecords = 0L;
+ for (Map.Entry<TopicPartition, OffsetAndMetadata> committedOffsetEntry : committed.entrySet()) {
+ final TopicPartition partition = committedOffsetEntry.getKey();
+ final OffsetAndMetadata consumedOffsetMeta = consumed.get(partition);
+ if (consumedOffsetMeta != null) {
+ final OffsetAndMetadata committedOffsetMeta = committedOffsetEntry.getValue();
+ long consumedOffset = consumedOffsetMeta.offset();
+ long committedOffset = committedOffsetMeta.offset();
+ long diff = consumedOffset - committedOffset;
+ // Connector tasks can return offsets, so make sure nothing wonky happens
+ activeRecords += Math.max(diff, 0L);
+ }
+ }
+ sinkRecordActiveCount.record(activeRecords);
+ }
+
+ void close() {
+ metricGroup.close();
+ }
+
+ void recordRead(int batchSize) {
+ sinkRecordRead.record(batchSize);
+ }
+
+ void recordSend(int batchSize) {
+ sinkRecordSend.record(batchSize);
+ }
+
+ void recordPut(long duration) {
+ putBatchTime.record(duration);
+ }
+
+ void recordPartitionCount(int assignedPartitionCount) {
+ partitionCount.record(assignedPartitionCount);
+ }
+
+ void recordOffsetSequenceNumber(int seqNum) {
+ offsetSeqNum.record(seqNum);
+ }
+
+ void recordConsumedOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
+ consumedOffsets.putAll(offsets);
+ computeSinkRecordLag();
+ }
+
+ void recordCommittedOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
+ committedOffsets = offsets;
+ computeSinkRecordLag();
+ }
+
+ void assignedOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
+ consumedOffsets = new HashMap<>(offsets);
+ committedOffsets = offsets;
+ sinkRecordActiveCount.record(0.0);
+ }
+
+ void clearOffsets() {
+ consumedOffsets.clear();
+ committedOffsets.clear();
+ sinkRecordActiveCount.record(0.0);
+ }
+
+ void recordOffsetCommitSuccess() {
+ offsetCompletion.record(1.0);
+ }
+
+ void recordOffsetCommitSkip() {
+ offsetCompletionSkip.record(1.0);
+ }
+
+ protected MetricGroup metricGroup() {
+ return metricGroup;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/05357b70/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 0f1874e..9a187d3 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Total;
+import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
@@ -205,10 +206,12 @@ class WorkerSourceTask extends WorkerTask {
private boolean sendRecords() {
int processed = 0;
recordBatch(toSend.size());
+ final SourceRecordWriteCounter counter = new SourceRecordWriteCounter(toSend.size(), sourceTaskMetricsGroup);
for (final SourceRecord preTransformRecord : toSend) {
final SourceRecord record = transformationChain.apply(preTransformRecord);
if (record == null) {
+ counter.skipRecord();
commitTaskRecord(preTransformRecord);
continue;
}
@@ -256,6 +259,7 @@ class WorkerSourceTask extends WorkerTask {
commitTaskRecord(preTransformRecord);
}
recordSent(producerRecord);
+ counter.completeRecord();
}
});
lastSendFailed = false;
@@ -263,6 +267,7 @@ class WorkerSourceTask extends WorkerTask {
log.warn("{} Failed to send {}, backing off before retrying:", this, producerRecord, e);
toSend = toSend.subList(processed, toSend.size());
lastSendFailed = true;
+ counter.retryRemaining();
return false;
} catch (KafkaException e) {
throw new ConnectException("Unrecoverable exception trying to send", e);
@@ -282,7 +287,6 @@ class WorkerSourceTask extends WorkerTask {
}
private synchronized void recordSent(final ProducerRecord<byte[], byte[]> record) {
- recordWriteCompleted(1);
ProducerRecord<byte[], byte[]> removed = outstandingMessages.remove(record);
// While flushing, we may also see callbacks for items in the backlog
if (removed == null && flushing)
@@ -440,19 +444,50 @@ class WorkerSourceTask extends WorkerTask {
sourceTaskMetricsGroup.recordPoll(numRecordsInBatch, duration);
}
- protected void recordWriteCompleted(int numRecords) {
- sourceTaskMetricsGroup.recordWrite(numRecords);
- }
-
SourceTaskMetricsGroup sourceTaskMetricsGroup() {
return sourceTaskMetricsGroup;
}
+ static class SourceRecordWriteCounter {
+ private final SourceTaskMetricsGroup metricsGroup;
+ private final int batchSize;
+ private boolean completed = false;
+ private int counter;
+ public SourceRecordWriteCounter(int batchSize, SourceTaskMetricsGroup metricsGroup) {
+ assert batchSize > 0;
+ assert metricsGroup != null;
+ this.batchSize = batchSize;
+ counter = batchSize;
+ this.metricsGroup = metricsGroup;
+ }
+ public void skipRecord() {
+ if (counter > 0 && --counter == 0) {
+ finishedAllWrites();
+ }
+ }
+ public void completeRecord() {
+ if (counter > 0 && --counter == 0) {
+ finishedAllWrites();
+ }
+ }
+ public void retryRemaining() {
+ finishedAllWrites();
+ }
+ private void finishedAllWrites() {
+ if (!completed) {
+ metricsGroup.recordWrite(batchSize - counter);
+ completed = true;
+ }
+ }
+ }
+
static class SourceTaskMetricsGroup {
private final MetricGroup metricGroup;
private final Sensor sourceRecordPoll;
private final Sensor sourceRecordWrite;
+ private final Sensor sourceRecordActiveCount;
private final Sensor pollTime;
+ private int activeRecordCount;
public SourceTaskMetricsGroup(ConnectorTaskId id, ConnectMetrics connectMetrics) {
metricGroup = connectMetrics.group("source-task-metrics",
@@ -490,27 +525,16 @@ class WorkerSourceTask extends WorkerTask {
"The average time in milliseconds taken by this task to poll for a batch of source records"),
new Avg());
-// int buckets = 100;
-// MetricName p99 = metricGroup.metricName("poll-batch-99p-time-ms",
-// "The 99th percentile time in milliseconds spent by this task to poll for a batch of source records");
-// MetricName p95 = metricGroup.metricName("poll-batch-95p-time-ms",
-// "The 95th percentile time in milliseconds spent by this task to poll for a batch of source records");
-// MetricName p90 = metricGroup.metricName("poll-batch-90p-time-ms",
-// "The 90th percentile time in milliseconds spent by this task to poll for a batch of source records");
-// MetricName p75 = metricGroup.metricName("poll-batch-75p-time-ms",
-// "The 75th percentile time in milliseconds spent by this task to poll for a batch of source records");
-// MetricName p50 = metricGroup.metricName("poll-batch-50p-time-ms",
-// "The 50th percentile (median) time in milliseconds spent by this task to poll for a batch of source records");
-// Percentiles pollTimePercentiles = new Percentiles(4 * buckets,
-// 0.0,
-// 10*1000.0,
-// BucketSizing.LINEAR,
-// new Percentile(p50, 50),
-// new Percentile(p75, 75),
-// new Percentile(p90, 90),
-// new Percentile(p95, 95),
-// new Percentile(p99, 99));
-// pollTime.add(pollTimePercentiles);
+ sourceRecordActiveCount = metricGroup.metrics().sensor("source-record-active-count");
+ sourceRecordActiveCount.add(metricGroup.metricName("source-record-active-count",
+ "The number of records that have been produced by this task but not yet completely written to Kafka."),
+ new Value());
+ sourceRecordActiveCount.add(metricGroup.metricName("source-record-active-count-max",
+ "The maximum number of records that have been produced by this task but not yet completely written to Kafka."),
+ new Max());
+ sourceRecordActiveCount.add(metricGroup.metricName("source-record-active-count-avg",
+ "The average number of records that have been produced by this task but not yet completely written to Kafka."),
+ new Avg());
}
void close() {
@@ -520,10 +544,15 @@ class WorkerSourceTask extends WorkerTask {
void recordPoll(int batchSize, long duration) {
sourceRecordPoll.record(batchSize);
pollTime.record(duration);
+ activeRecordCount += batchSize;
+ sourceRecordActiveCount.record(activeRecordCount);
}
void recordWrite(int recordCount) {
sourceRecordWrite.record(recordCount);
+ activeRecordCount -= recordCount;
+ activeRecordCount = Math.max(0, activeRecordCount);
+ sourceRecordActiveCount.record(activeRecordCount);
}
protected MetricGroup metricGroup() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/05357b70/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
index a4592b8..00be485 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
@@ -53,7 +53,11 @@ public class MockConnectMetrics extends ConnectMetrics {
}
public MockConnectMetrics() {
- super("mock", new WorkerConfig(WorkerConfig.baseConfigDef(), DEFAULT_WORKER_CONFIG), new MockTime());
+ this(new MockTime());
+ }
+
+ public MockConnectMetrics(MockTime time) {
+ super("mock", new WorkerConfig(WorkerConfig.baseConfigDef(), DEFAULT_WORKER_CONFIG), time);
}
@Override
@@ -133,7 +137,8 @@ public class MockConnectMetrics extends ConnectMetrics {
* @return the current value of the metric
*/
public double currentMetricValue(MetricName metricName) {
- return metricsByName.get(metricName).value();
+ KafkaMetric metric = metricsByName.get(metricName);
+ return metric != null ? metric.value() : Double.NaN;
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/05357b70/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 5f3f888..290cdd0 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -26,10 +26,10 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.sink.SinkConnector;
@@ -107,7 +107,7 @@ public class WorkerSinkTaskTest {
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
private TargetState initialState = TargetState.STARTED;
- private Time time;
+ private MockTime time;
private WorkerSinkTask workerTask;
@Mock
private SinkTask sinkTask;
@@ -144,7 +144,7 @@ public class WorkerSinkTaskTest {
workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
workerConfig = new StandaloneConfig(workerProps);
pluginLoader = PowerMock.createMock(PluginClassLoader.class);
- metrics = new MockConnectMetrics();
+ metrics = new MockConnectMetrics(time);
recordsReturnedTp1 = 0;
recordsReturnedTp3 = 0;
}
@@ -177,6 +177,14 @@ public class WorkerSinkTaskTest {
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration();
+ time.sleep(10000L);
+
+ assertSinkMetricValue("partition-count", 2);
+ assertTaskMetricValue("status-running", 0.0);
+ assertTaskMetricValue("status-paused", 1.0);
+ assertTaskMetricValue("running-ratio", 0.0);
+ assertTaskMetricValue("pause-ratio", 1.0);
+ assertTaskMetricValue("offset-commit-max-time-ms", Double.NEGATIVE_INFINITY);
PowerMock.verifyAll();
}
@@ -232,11 +240,47 @@ public class WorkerSinkTaskTest {
workerTask.iteration(); // initial assignment
workerTask.iteration(); // fetch some data
workerTask.transitionTo(TargetState.PAUSED);
+ time.sleep(10000L);
+
+ assertSinkMetricValue("partition-count", 2);
+ assertSinkMetricValue("sink-record-read-total", 1.0);
+ assertSinkMetricValue("sink-record-send-total", 1.0);
+ assertSinkMetricValue("sink-record-active-count", 1.0);
+ assertSinkMetricValue("sink-record-active-count-max", 1.0);
+ assertSinkMetricValue("sink-record-active-count-avg", 0.333333);
+ assertSinkMetricValue("offset-commit-seq-no", 0.0);
+ assertSinkMetricValue("offset-commit-completion-rate", 0.0);
+ assertSinkMetricValue("offset-commit-completion-total", 0.0);
+ assertSinkMetricValue("offset-commit-completion-skip-rate", 0.0);
+ assertSinkMetricValue("offset-commit-completion-skip-total", 0.0);
+ assertTaskMetricValue("status-running", 1.0);
+ assertTaskMetricValue("status-paused", 0.0);
+ assertTaskMetricValue("running-ratio", 1.0);
+ assertTaskMetricValue("pause-ratio", 0.0);
+ assertTaskMetricValue("batch-size-max", 1.0);
+ assertTaskMetricValue("batch-size-avg", 0.5);
+ assertTaskMetricValue("offset-commit-max-time-ms", Double.NEGATIVE_INFINITY);
+ assertTaskMetricValue("offset-commit-failure-percentage", 0.0);
+ assertTaskMetricValue("offset-commit-success-percentage", 0.0);
+
workerTask.iteration(); // wakeup
workerTask.iteration(); // now paused
+ time.sleep(30000L);
+
+ assertSinkMetricValue("offset-commit-seq-no", 1.0);
+ assertSinkMetricValue("offset-commit-completion-rate", 0.0333);
+ assertSinkMetricValue("offset-commit-completion-total", 1.0);
+ assertSinkMetricValue("offset-commit-completion-skip-rate", 0.0);
+ assertSinkMetricValue("offset-commit-completion-skip-total", 0.0);
+ assertTaskMetricValue("status-running", 0.0);
+ assertTaskMetricValue("status-paused", 1.0);
+ assertTaskMetricValue("running-ratio", 0.25);
+ assertTaskMetricValue("pause-ratio", 0.75);
+
workerTask.transitionTo(TargetState.STARTED);
workerTask.iteration(); // wakeup
workerTask.iteration(); // now unpaused
+ //printMetrics();
PowerMock.verifyAll();
}
@@ -276,8 +320,43 @@ public class WorkerSinkTaskTest {
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration();
+ time.sleep(10000L);
+
+ assertSinkMetricValue("partition-count", 2);
+ assertSinkMetricValue("sink-record-read-total", 0.0);
+ assertSinkMetricValue("sink-record-send-total", 0.0);
+ assertSinkMetricValue("sink-record-active-count", 0.0);
+ assertSinkMetricValue("sink-record-active-count-max", 0.0);
+ assertSinkMetricValue("sink-record-active-count-avg", 0.0);
+ assertSinkMetricValue("offset-commit-seq-no", 0.0);
+ assertSinkMetricValue("offset-commit-completion-rate", 0.0);
+ assertSinkMetricValue("offset-commit-completion-total", 0.0);
+ assertSinkMetricValue("offset-commit-completion-skip-rate", 0.0);
+ assertSinkMetricValue("offset-commit-completion-skip-total", 0.0);
+ assertTaskMetricValue("status-running", 1.0);
+ assertTaskMetricValue("status-paused", 0.0);
+ assertTaskMetricValue("running-ratio", 1.0);
+ assertTaskMetricValue("pause-ratio", 0.0);
+ assertTaskMetricValue("batch-size-max", 0.0);
+ assertTaskMetricValue("batch-size-avg", 0.0);
+ assertTaskMetricValue("offset-commit-max-time-ms", Double.NEGATIVE_INFINITY);
+ assertTaskMetricValue("offset-commit-failure-percentage", 0.0);
+ assertTaskMetricValue("offset-commit-success-percentage", 0.0);
+
workerTask.iteration();
workerTask.iteration();
+ time.sleep(30000L);
+
+ assertSinkMetricValue("sink-record-read-total", 1.0);
+ assertSinkMetricValue("sink-record-send-total", 1.0);
+ assertSinkMetricValue("sink-record-active-count", 1.0);
+ assertSinkMetricValue("sink-record-active-count-max", 1.0);
+ assertSinkMetricValue("sink-record-active-count-avg", 0.5);
+ assertTaskMetricValue("status-running", 1.0);
+ assertTaskMetricValue("status-paused", 0.0);
+ assertTaskMetricValue("running-ratio", 1.0);
+ assertTaskMetricValue("batch-size-max", 1.0);
+ assertTaskMetricValue("batch-size-avg", 0.5);
PowerMock.verifyAll();
}
@@ -394,10 +473,35 @@ public class WorkerSinkTaskTest {
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
+ time.sleep(30000L);
workerTask.initializeAndStart();
+ time.sleep(30000L);
+
workerTask.iteration(); // poll for initial assignment
+ time.sleep(30000L);
workerTask.iteration(); // first record delivered
workerTask.iteration(); // now rebalance with the wakeup triggered
+ time.sleep(30000L);
+
+ assertSinkMetricValue("partition-count", 2);
+ assertSinkMetricValue("sink-record-read-total", 1.0);
+ assertSinkMetricValue("sink-record-send-total", 1.0);
+ assertSinkMetricValue("sink-record-active-count", 0.0);
+ assertSinkMetricValue("sink-record-active-count-max", 1.0);
+ assertSinkMetricValue("sink-record-active-count-avg", 0.33333);
+ assertSinkMetricValue("offset-commit-seq-no", 1.0);
+ assertSinkMetricValue("offset-commit-completion-total", 1.0);
+ assertSinkMetricValue("offset-commit-completion-skip-total", 0.0);
+ assertTaskMetricValue("status-running", 1.0);
+ assertTaskMetricValue("status-paused", 0.0);
+ assertTaskMetricValue("running-ratio", 1.0);
+ assertTaskMetricValue("pause-ratio", 0.0);
+ assertTaskMetricValue("batch-size-max", 1.0);
+ assertTaskMetricValue("batch-size-avg", 1.0);
+ assertTaskMetricValue("offset-commit-max-time-ms", 0.0);
+ assertTaskMetricValue("offset-commit-avg-time-ms", 0.0);
+ assertTaskMetricValue("offset-commit-failure-percentage", 0.0);
+ assertTaskMetricValue("offset-commit-success-percentage", 1.0);
PowerMock.verifyAll();
}
@@ -440,18 +544,61 @@ public class WorkerSinkTaskTest {
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
- workerTask.iteration(); // initial assignment
+ // Initial assignment
+ time.sleep(30000L);
+ workerTask.iteration();
+ assertSinkMetricValue("partition-count", 2);
- workerTask.iteration(); // first record delivered
+ // First record delivered
+ workerTask.iteration();
+ assertSinkMetricValue("partition-count", 2);
+ assertSinkMetricValue("sink-record-read-total", 1.0);
+ assertSinkMetricValue("sink-record-send-total", 1.0);
+ assertSinkMetricValue("sink-record-active-count", 1.0);
+ assertSinkMetricValue("sink-record-active-count-max", 1.0);
+ assertSinkMetricValue("sink-record-active-count-avg", 0.333333);
+ assertSinkMetricValue("offset-commit-seq-no", 0.0);
+ assertSinkMetricValue("offset-commit-completion-total", 0.0);
+ assertSinkMetricValue("offset-commit-completion-skip-total", 0.0);
+ assertTaskMetricValue("status-running", 1.0);
+ assertTaskMetricValue("status-paused", 0.0);
+ assertTaskMetricValue("running-ratio", 1.0);
+ assertTaskMetricValue("pause-ratio", 0.0);
+ assertTaskMetricValue("batch-size-max", 1.0);
+ assertTaskMetricValue("batch-size-avg", 0.5);
+ assertTaskMetricValue("offset-commit-failure-percentage", 0.0);
+ assertTaskMetricValue("offset-commit-success-percentage", 0.0);
sinkTaskContext.getValue().requestCommit();
assertTrue(sinkTaskContext.getValue().isCommitRequested());
assertNotEquals(offsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
+ time.sleep(10000L);
workerTask.iteration(); // triggers the commit
+ time.sleep(10000L);
assertFalse(sinkTaskContext.getValue().isCommitRequested()); // should have been cleared
assertEquals(offsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
assertEquals(0, workerTask.commitFailures());
+ assertSinkMetricValue("partition-count", 2);
+ assertSinkMetricValue("sink-record-read-total", 1.0);
+ assertSinkMetricValue("sink-record-send-total", 1.0);
+ assertSinkMetricValue("sink-record-active-count", 0.0);
+ assertSinkMetricValue("sink-record-active-count-max", 1.0);
+ assertSinkMetricValue("sink-record-active-count-avg", 0.2);
+ assertSinkMetricValue("offset-commit-seq-no", 1.0);
+ assertSinkMetricValue("offset-commit-completion-total", 1.0);
+ assertSinkMetricValue("offset-commit-completion-skip-total", 0.0);
+ assertTaskMetricValue("status-running", 1.0);
+ assertTaskMetricValue("status-paused", 0.0);
+ assertTaskMetricValue("running-ratio", 1.0);
+ assertTaskMetricValue("pause-ratio", 0.0);
+ assertTaskMetricValue("batch-size-max", 1.0);
+ assertTaskMetricValue("batch-size-avg", 0.33333);
+ assertTaskMetricValue("offset-commit-max-time-ms", 0.0);
+ assertTaskMetricValue("offset-commit-avg-time-ms", 0.0);
+ assertTaskMetricValue("offset-commit-failure-percentage", 0.0);
+ assertTaskMetricValue("offset-commit-success-percentage", 1.0);
+
PowerMock.verifyAll();
}
@@ -835,6 +982,26 @@ public class WorkerSinkTaskTest {
sinkTaskContext.getValue().requestCommit();
workerTask.iteration(); // iter 3 -- commit in progress
+ assertSinkMetricValue("partition-count", 3);
+ assertSinkMetricValue("sink-record-read-total", 3.0);
+ assertSinkMetricValue("sink-record-send-total", 3.0);
+ assertSinkMetricValue("sink-record-active-count", 4.0);
+ assertSinkMetricValue("sink-record-active-count-max", 4.0);
+ assertSinkMetricValue("sink-record-active-count-avg", 0.71429);
+ assertSinkMetricValue("offset-commit-seq-no", 2.0);
+ assertSinkMetricValue("offset-commit-completion-total", 1.0);
+ assertSinkMetricValue("offset-commit-completion-skip-total", 1.0);
+ assertTaskMetricValue("status-running", 1.0);
+ assertTaskMetricValue("status-paused", 0.0);
+ assertTaskMetricValue("running-ratio", 1.0);
+ assertTaskMetricValue("pause-ratio", 0.0);
+ assertTaskMetricValue("batch-size-max", 2.0);
+ assertTaskMetricValue("batch-size-avg", 1.0);
+ assertTaskMetricValue("offset-commit-max-time-ms", 0.0);
+ assertTaskMetricValue("offset-commit-avg-time-ms", 0.0);
+ assertTaskMetricValue("offset-commit-failure-percentage", 0.0);
+ assertTaskMetricValue("offset-commit-success-percentage", 1.0);
+
assertTrue(asyncCallbackRan.get());
assertTrue(rebalanced.get());
@@ -850,6 +1017,26 @@ public class WorkerSinkTaskTest {
assertEquals(postRebalanceCurrentOffsets, Whitebox.getInternalState(workerTask, "currentOffsets"));
assertEquals(postRebalanceCurrentOffsets, Whitebox.getInternalState(workerTask, "lastCommittedOffsets"));
+ assertSinkMetricValue("partition-count", 3);
+ assertSinkMetricValue("sink-record-read-total", 4.0);
+ assertSinkMetricValue("sink-record-send-total", 4.0);
+ assertSinkMetricValue("sink-record-active-count", 0.0);
+ assertSinkMetricValue("sink-record-active-count-max", 4.0);
+ assertSinkMetricValue("sink-record-active-count-avg", 0.5555555);
+ assertSinkMetricValue("offset-commit-seq-no", 3.0);
+ assertSinkMetricValue("offset-commit-completion-total", 2.0);
+ assertSinkMetricValue("offset-commit-completion-skip-total", 1.0);
+ assertTaskMetricValue("status-running", 1.0);
+ assertTaskMetricValue("status-paused", 0.0);
+ assertTaskMetricValue("running-ratio", 1.0);
+ assertTaskMetricValue("pause-ratio", 0.0);
+ assertTaskMetricValue("batch-size-max", 2.0);
+ assertTaskMetricValue("batch-size-avg", 1.0);
+ assertTaskMetricValue("offset-commit-max-time-ms", 0.0);
+ assertTaskMetricValue("offset-commit-avg-time-ms", 0.0);
+ assertTaskMetricValue("offset-commit-failure-percentage", 0.0);
+ assertTaskMetricValue("offset-commit-success-percentage", 1.0);
+
PowerMock.verifyAll();
}
@@ -1096,6 +1283,76 @@ public class WorkerSinkTaskTest {
}).times(numMessages);
}
+
+ private void assertSinkMetricValue(String name, double expected) {
+ MetricGroup sinkTaskGroup = workerTask.sinkTaskMetricsGroup().metricGroup();
+ double measured = metrics.currentMetricValue(sinkTaskGroup, name);
+ assertEquals(expected, measured, 0.001d);
+ }
+
+ private void assertTaskMetricValue(String name, double expected) {
+ MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup();
+ double measured = metrics.currentMetricValue(taskGroup, name);
+ assertEquals(expected, measured, 0.001d);
+ }
+
+ private void printMetrics() {
+ System.out.println("");
+ sinkMetricValue("sink-record-read-rate");
+ sinkMetricValue("sink-record-read-total");
+ sinkMetricValue("sink-record-send-rate");
+ sinkMetricValue("sink-record-send-total");
+ sinkMetricValue("sink-record-active-count");
+ sinkMetricValue("sink-record-active-count-max");
+ sinkMetricValue("sink-record-active-count-avg");
+ sinkMetricValue("partition-count");
+ sinkMetricValue("offset-commit-seq-no");
+ sinkMetricValue("offset-commit-completion-rate");
+ sinkMetricValue("offset-commit-completion-total");
+ sinkMetricValue("offset-commit-completion-skip-rate");
+ sinkMetricValue("offset-commit-completion-skip-total");
+ sinkMetricValue("put-batch-max-time-ms");
+ sinkMetricValue("put-batch-avg-time-ms");
+
+ taskMetricValue("status-unassigned");
+ taskMetricValue("status-running");
+ taskMetricValue("status-paused");
+ taskMetricValue("status-failed");
+ taskMetricValue("status-destroyed");
+ taskMetricValue("running-ratio");
+ taskMetricValue("pause-ratio");
+ taskMetricValue("offset-commit-max-time-ms");
+ taskMetricValue("offset-commit-avg-time-ms");
+ taskMetricValue("batch-size-max");
+ taskMetricValue("batch-size-avg");
+ taskMetricValue("offset-commit-failure-percentage");
+ taskMetricValue("offset-commit-success-percentage");
+ }
+
+ private double sinkMetricValue(String metricName) {
+ MetricGroup sinkTaskGroup = workerTask.sinkTaskMetricsGroup().metricGroup();
+ double value = metrics.currentMetricValue(sinkTaskGroup, metricName);
+ System.out.println("** " + metricName + "=" + value);
+ return value;
+ }
+
+ private double taskMetricValue(String metricName) {
+ MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup();
+ double value = metrics.currentMetricValue(taskGroup, metricName);
+ System.out.println("** " + metricName + "=" + value);
+ return value;
+ }
+
+
+ private void assertMetrics(int minimumPollCountExpected) {
+ MetricGroup sinkTaskGroup = workerTask.sinkTaskMetricsGroup().metricGroup();
+ MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup();
+ double readRate = metrics.currentMetricValue(sinkTaskGroup, "sink-record-read-rate");
+ double readTotal = metrics.currentMetricValue(sinkTaskGroup, "sink-record-read-total");
+ double sendRate = metrics.currentMetricValue(sinkTaskGroup, "sink-record-send-rate");
+ double sendTotal = metrics.currentMetricValue(sinkTaskGroup, "sink-record-send-total");
+ }
+
private abstract static class TestSinkTask extends SinkTask {
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/05357b70/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 8a23ad5..d1e0290 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -820,6 +820,12 @@ public class WorkerSourceTaskTest extends ThreadedTest {
assertTrue(pollBatchTimeMax >= 0.0d);
}
assertTrue(pollBatchTimeAvg >= 0.0d);
+ double activeCount = metrics.currentMetricValue(sourceTaskGroup, "source-record-active-count");
+ double activeCountMax = metrics.currentMetricValue(sourceTaskGroup, "source-record-active-count-max");
+ assertEquals(0, activeCount, 0.000001d);
+ if (minimumPollCountExpected > 0) {
+ assertEquals(RECORDS.size(), activeCountMax, 0.000001d);
+ }
}
private abstract static class TestSourceTask extends SourceTask {