You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2017/10/03 18:52:17 UTC

kafka git commit: KAFKA-5902: Added sink task metrics (KIP-196)

Repository: kafka
Updated Branches:
  refs/heads/trunk 5663f51ed -> 05357b703


KAFKA-5902: Added sink task metrics (KIP-196)

Added Connect metrics specific to source tasks, and builds upon #3864 and #3911 that have already been merged into `trunk`, and #3959 that has yet to be merged.

I'll rebase this PR when the latter is merged.

Author: Randall Hauch <rh...@gmail.com>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #3975 from rhauch/kafka-5902


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/05357b70
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/05357b70
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/05357b70

Branch: refs/heads/trunk
Commit: 05357b7030c784a6548453f533d3c00e19548ba2
Parents: 5663f51
Author: Randall Hauch <rh...@gmail.com>
Authored: Tue Oct 3 11:52:14 2017 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Oct 3 11:52:14 2017 -0700

----------------------------------------------------------------------
 .../kafka/connect/runtime/StateTracker.java     |   2 +-
 .../kafka/connect/runtime/WorkerSinkTask.java   | 209 +++++++++++++++
 .../kafka/connect/runtime/WorkerSourceTask.java |  81 ++++--
 .../connect/runtime/MockConnectMetrics.java     |   9 +-
 .../connect/runtime/WorkerSinkTaskTest.java     | 267 ++++++++++++++++++-
 .../connect/runtime/WorkerSourceTaskTest.java   |   6 +
 6 files changed, 540 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/05357b70/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java
index 566a6fc..f2f0986 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java
@@ -167,7 +167,7 @@ public class StateTracker {
             }
             long total = durationCurrent + unassignedTotalTimeMs + runningTotalTimeMs + pausedTotalTimeMs +
                                  failedTotalTimeMs + destroyedTotalTimeMs;
-            return (double) durationDesired / total;
+            return total == 0.0d ? 0.0d : (double) durationDesired / total;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/05357b70/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 8f0f7fd..3cb68b7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -26,11 +26,18 @@ import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.Total;
+import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.storage.Converter;
@@ -62,6 +69,7 @@ class WorkerSinkTask extends WorkerTask {
     private final Converter keyConverter;
     private final Converter valueConverter;
     private final TransformationChain<SinkRecord> transformationChain;
+    private final SinkTaskMetricsGroup sinkTaskMetricsGroup;
     private KafkaConsumer<byte[], byte[]> consumer;
     private WorkerSinkTaskContext context;
     private final List<SinkRecord> messageBatch;
@@ -106,6 +114,8 @@ class WorkerSinkTask extends WorkerTask {
         this.commitSeqno = 0;
         this.commitStarted = -1;
         this.commitFailures = 0;
+        this.sinkTaskMetricsGroup = new SinkTaskMetricsGroup(id, connectMetrics);
+        this.sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno);
     }
 
     @Override
@@ -139,6 +149,7 @@ class WorkerSinkTask extends WorkerTask {
 
     @Override
     protected void releaseResources() {
+        sinkTaskMetricsGroup.close();
     }
 
     @Override
@@ -215,6 +226,7 @@ class WorkerSinkTask extends WorkerTask {
         if (commitSeqno != seqno) {
             log.debug("{} Received out of order commit callback for sequence number {}, but most recent sequence number is {}",
                     this, seqno, commitSeqno);
+            sinkTaskMetricsGroup.recordOffsetCommitSkip();
         } else {
             long durationMillis = time.milliseconds() - commitStarted;
             if (error != null) {
@@ -228,6 +240,7 @@ class WorkerSinkTask extends WorkerTask {
                 if (committedOffsets != null) {
                     log.debug("{} Setting last committed offsets to {}", this, committedOffsets);
                     lastCommittedOffsets = committedOffsets;
+                    sinkTaskMetricsGroup.recordCommittedOffsets(committedOffsets);
                 }
                 commitFailures = 0;
                 recordCommitSuccess(durationMillis);
@@ -322,6 +335,7 @@ class WorkerSinkTask extends WorkerTask {
         committing = true;
         commitSeqno += 1;
         commitStarted = now;
+        sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno);
 
         final Map<TopicPartition, OffsetAndMetadata> taskProvidedOffsets;
         try {
@@ -400,6 +414,7 @@ class WorkerSinkTask extends WorkerTask {
             throw e;
         }
 
+        sinkTaskMetricsGroup.recordRead(msgs.count());
         return msgs;
     }
 
@@ -456,6 +471,7 @@ class WorkerSinkTask extends WorkerTask {
                         this, msg.topic(), msg.partition(), msg.offset(), timestamp, keyAndSchema.value(), valueAndSchema.value());
             }
         }
+        sinkTaskMetricsGroup.recordConsumedOffsets(origOffsets);
     }
 
     private void resumeAll() {
@@ -473,8 +489,10 @@ class WorkerSinkTask extends WorkerTask {
         try {
             // Since we reuse the messageBatch buffer, ensure we give the task its own copy
             log.trace("{} Delivering batch of {} messages to task", this, messageBatch.size());
+            long start = time.milliseconds();
             task.put(new ArrayList<>(messageBatch));
             recordBatch(messageBatch.size());
+            sinkTaskMetricsGroup.recordPut(time.milliseconds() - start);
             currentOffsets.putAll(origOffsets);
             messageBatch.clear();
             // If we had paused all consumer topic partitions to try to redeliver data, then we should resume any that
@@ -519,11 +537,34 @@ class WorkerSinkTask extends WorkerTask {
     }
 
     private void openPartitions(Collection<TopicPartition> partitions) {
+        sinkTaskMetricsGroup.recordPartitionCount(partitions.size());
         task.open(partitions);
     }
 
     private void closePartitions() {
         commitOffsets(time.milliseconds(), true);
+        sinkTaskMetricsGroup.recordPartitionCount(0);
+    }
+
+    @Override
+    protected void recordBatch(int size) {
+        super.recordBatch(size);
+        sinkTaskMetricsGroup.recordSend(size);
+    }
+
+    @Override
+    protected void recordCommitFailure(long duration, Throwable error) {
+        super.recordCommitFailure(duration, error);
+    }
+
+    @Override
+    protected void recordCommitSuccess(long duration) {
+        super.recordCommitSuccess(duration);
+        sinkTaskMetricsGroup.recordOffsetCommitSuccess();
+    }
+
+    SinkTaskMetricsGroup sinkTaskMetricsGroup() {
+        return sinkTaskMetricsGroup;
     }
 
     private class HandleRebalance implements ConsumerRebalanceListener {
@@ -538,6 +579,7 @@ class WorkerSinkTask extends WorkerTask {
                 currentOffsets.put(tp, new OffsetAndMetadata(pos));
                 log.debug("{} Assigned topic partition {} with offset {}", this, tp, pos);
             }
+            sinkTaskMetricsGroup.assignedOffsets(currentOffsets);
 
             // If we paused everything for redelivery (which is no longer relevant since we discarded the data), make
             // sure anything we paused that the task didn't request to be paused *and* which we still own is resumed.
@@ -572,6 +614,7 @@ class WorkerSinkTask extends WorkerTask {
             log.debug("{} Partitions revoked", WorkerSinkTask.this);
             try {
                 closePartitions();
+                sinkTaskMetricsGroup.clearOffsets();
             } catch (RuntimeException e) {
                 // The consumer swallows exceptions raised in the rebalance listener, so we need to store
                 // exceptions and rethrow when poll() returns.
@@ -582,4 +625,170 @@ class WorkerSinkTask extends WorkerTask {
             messageBatch.clear();
         }
     }
+
+    static class SinkTaskMetricsGroup {
+        private final MetricGroup metricGroup;
+        private final Sensor sinkRecordRead;
+        private final Sensor sinkRecordSend;
+        private final Sensor partitionCount;
+        private final Sensor offsetSeqNum;
+        private final Sensor offsetCompletion;
+        private final Sensor offsetCompletionSkip;
+        private final Sensor putBatchTime;
+        private final Sensor sinkRecordActiveCount;
+        private long activeRecords;
+        private Map<TopicPartition, OffsetAndMetadata> consumedOffsets = new HashMap<>();
+        private Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
+
+        public SinkTaskMetricsGroup(ConnectorTaskId id, ConnectMetrics connectMetrics) {
+            metricGroup = connectMetrics.group("sink-task-metrics",
+                    "connector", id.connector(), "task", Integer.toString(id.task()));
+
+            sinkRecordRead = metricGroup.metrics().sensor("sink-record-read");
+            sinkRecordRead.add(metricGroup.metricName("sink-record-read-rate",
+                    "The average per-second number of records read from Kafka for this task belonging to the " +
+                            "named sink connector in this worker. This is before transformations are applied."),
+                    new Rate());
+            sinkRecordRead.add(metricGroup.metricName("sink-record-read-total",
+                    "The total number of records produced/polled (before transformation) by this task belonging " +
+                            "to the named sink connector in this worker, since the task was last restarted."),
+                    new Total());
+
+            sinkRecordSend = metricGroup.metrics().sensor("sink-record-send");
+            sinkRecordSend.add(metricGroup.metricName("sink-record-send-rate",
+                    "The average per-second number of records output from the transformations and sent/put to " +
+                            "this task belonging to the named sink connector in this worker. This is after transformations " +
+                            "are applied and excludes any records filtered out by the transformations."),
+                    new Rate());
+            sinkRecordSend.add(metricGroup.metricName("sink-record-send-total",
+                    "The total number of records output from the transformations and sent/put to this task " +
+                            "belonging to the named sink connector in this worker, since the task was last restarted."),
+                    new Total());
+
+            sinkRecordActiveCount = metricGroup.metrics().sensor("sink-record-active-count");
+            sinkRecordActiveCount.add(metricGroup.metricName("sink-record-active-count",
+                    "The number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged" +
+                        "by the sink task"),
+                    new Value());
+            sinkRecordActiveCount.add(metricGroup.metricName("sink-record-active-count-max",
+                    "The maximum number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged" +
+                        "by the sink task"),
+                    new Max());
+            sinkRecordActiveCount.add(metricGroup.metricName("sink-record-active-count-avg",
+                    "The average number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged" +
+                        "by the sink task"),
+                    new Avg());
+
+            partitionCount = metricGroup.metrics().sensor("partition-count");
+            partitionCount.add(metricGroup.metricName("partition-count",
+                    "The number of topic partitions assigned to this task belonging to the named sink connector in this worker."),
+                    new Value());
+
+            offsetSeqNum = metricGroup.metrics().sensor("offset-seq-number");
+            offsetSeqNum.add(metricGroup.metricName("offset-commit-seq-no",
+                    "The current sequence number for offset commits"),
+                    new Value());
+
+            offsetCompletion = metricGroup.metrics().sensor("offset-commit-completion");
+            offsetCompletion.add(metricGroup.metricName("offset-commit-completion-rate",
+                    "The average per-second number of offset commit completions that were completed successfully."),
+                    new Rate());
+            offsetCompletion.add(metricGroup.metricName("offset-commit-completion-total",
+                    "The total number of offset commit completions that were completed successfully."),
+                    new Total());
+
+            offsetCompletionSkip = metricGroup.metrics().sensor("offset-commit-completion-skip");
+            offsetCompletionSkip.add(metricGroup.metricName("offset-commit-completion-skip-rate",
+                    "The average per-second number of offset commit completions that were received too late and skipped/ignored."),
+                    new Rate());
+            offsetCompletionSkip.add(metricGroup.metricName("offset-commit-completion-skip-total",
+                    "The total number of offset commit completions that were received too late and skipped/ignored."),
+                    new Total());
+
+            putBatchTime = metricGroup.metrics().sensor("put-batch-time");
+            putBatchTime.add(metricGroup.metricName("put-batch-max-time-ms",
+                    "The maximum time taken by this task to put a batch of sinks records."),
+                    new Max());
+            putBatchTime.add(metricGroup.metricName("put-batch-avg-time-ms",
+                    "The average time taken by this task to put a batch of sinks records."),
+                    new Avg());
+        }
+
+        void computeSinkRecordLag() {
+            Map<TopicPartition, OffsetAndMetadata> consumed = this.consumedOffsets;
+            Map<TopicPartition, OffsetAndMetadata> committed = this.committedOffsets;
+            activeRecords = 0L;
+            for (Map.Entry<TopicPartition, OffsetAndMetadata> committedOffsetEntry : committed.entrySet()) {
+                final TopicPartition partition = committedOffsetEntry.getKey();
+                final OffsetAndMetadata consumedOffsetMeta = consumed.get(partition);
+                if (consumedOffsetMeta != null) {
+                    final OffsetAndMetadata committedOffsetMeta = committedOffsetEntry.getValue();
+                    long consumedOffset = consumedOffsetMeta.offset();
+                    long committedOffset = committedOffsetMeta.offset();
+                    long diff = consumedOffset - committedOffset;
+                    // Connector tasks can return offsets, so make sure nothing wonky happens
+                    activeRecords += Math.max(diff, 0L);
+                }
+            }
+            sinkRecordActiveCount.record(activeRecords);
+        }
+
+        void close() {
+            metricGroup.close();
+        }
+
+        void recordRead(int batchSize) {
+            sinkRecordRead.record(batchSize);
+        }
+
+        void recordSend(int batchSize) {
+            sinkRecordSend.record(batchSize);
+        }
+
+        void recordPut(long duration) {
+            putBatchTime.record(duration);
+        }
+
+        void recordPartitionCount(int assignedPartitionCount) {
+            partitionCount.record(assignedPartitionCount);
+        }
+
+        void recordOffsetSequenceNumber(int seqNum) {
+            offsetSeqNum.record(seqNum);
+        }
+
+        void recordConsumedOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
+            consumedOffsets.putAll(offsets);
+            computeSinkRecordLag();
+        }
+
+        void recordCommittedOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
+            committedOffsets = offsets;
+            computeSinkRecordLag();
+        }
+
+        void assignedOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
+            consumedOffsets = new HashMap<>(offsets);
+            committedOffsets = offsets;
+            sinkRecordActiveCount.record(0.0);
+        }
+
+        void clearOffsets() {
+            consumedOffsets.clear();
+            committedOffsets.clear();
+            sinkRecordActiveCount.record(0.0);
+        }
+
+        void recordOffsetCommitSuccess() {
+            offsetCompletion.record(1.0);
+        }
+
+        void recordOffsetCommitSkip() {
+            offsetCompletionSkip.record(1.0);
+        }
+
+        protected MetricGroup metricGroup() {
+            return metricGroup;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/05357b70/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 0f1874e..9a187d3 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.metrics.stats.Total;
+import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
@@ -205,10 +206,12 @@ class WorkerSourceTask extends WorkerTask {
     private boolean sendRecords() {
         int processed = 0;
         recordBatch(toSend.size());
+        final SourceRecordWriteCounter counter = new SourceRecordWriteCounter(toSend.size(), sourceTaskMetricsGroup);
         for (final SourceRecord preTransformRecord : toSend) {
             final SourceRecord record = transformationChain.apply(preTransformRecord);
 
             if (record == null) {
+                counter.skipRecord();
                 commitTaskRecord(preTransformRecord);
                 continue;
             }
@@ -256,6 +259,7 @@ class WorkerSourceTask extends WorkerTask {
                                     commitTaskRecord(preTransformRecord);
                                 }
                                 recordSent(producerRecord);
+                                counter.completeRecord();
                             }
                         });
                 lastSendFailed = false;
@@ -263,6 +267,7 @@ class WorkerSourceTask extends WorkerTask {
                 log.warn("{} Failed to send {}, backing off before retrying:", this, producerRecord, e);
                 toSend = toSend.subList(processed, toSend.size());
                 lastSendFailed = true;
+                counter.retryRemaining();
                 return false;
             } catch (KafkaException e) {
                 throw new ConnectException("Unrecoverable exception trying to send", e);
@@ -282,7 +287,6 @@ class WorkerSourceTask extends WorkerTask {
     }
 
     private synchronized void recordSent(final ProducerRecord<byte[], byte[]> record) {
-        recordWriteCompleted(1);
         ProducerRecord<byte[], byte[]> removed = outstandingMessages.remove(record);
         // While flushing, we may also see callbacks for items in the backlog
         if (removed == null && flushing)
@@ -440,19 +444,50 @@ class WorkerSourceTask extends WorkerTask {
         sourceTaskMetricsGroup.recordPoll(numRecordsInBatch, duration);
     }
 
-    protected void recordWriteCompleted(int numRecords) {
-        sourceTaskMetricsGroup.recordWrite(numRecords);
-    }
-
     SourceTaskMetricsGroup sourceTaskMetricsGroup() {
         return sourceTaskMetricsGroup;
     }
 
+    static class SourceRecordWriteCounter {
+        private final SourceTaskMetricsGroup metricsGroup;
+        private final int batchSize;
+        private boolean completed = false;
+        private int counter;
+        public SourceRecordWriteCounter(int batchSize, SourceTaskMetricsGroup metricsGroup) {
+            assert batchSize > 0;
+            assert metricsGroup != null;
+            this.batchSize = batchSize;
+            counter = batchSize;
+            this.metricsGroup = metricsGroup;
+        }
+        public void skipRecord() {
+            if (counter > 0 && --counter == 0) {
+                finishedAllWrites();
+            }
+        }
+        public void completeRecord() {
+            if (counter > 0 && --counter == 0) {
+                finishedAllWrites();
+            }
+        }
+        public void retryRemaining() {
+            finishedAllWrites();
+        }
+        private void finishedAllWrites() {
+            if (!completed) {
+                metricsGroup.recordWrite(batchSize - counter);
+                completed = true;
+            }
+        }
+    }
+
     static class SourceTaskMetricsGroup {
         private final MetricGroup metricGroup;
         private final Sensor sourceRecordPoll;
         private final Sensor sourceRecordWrite;
+        private final Sensor sourceRecordActiveCount;
         private final Sensor pollTime;
+        private int activeRecordCount;
 
         public SourceTaskMetricsGroup(ConnectorTaskId id, ConnectMetrics connectMetrics) {
             metricGroup = connectMetrics.group("source-task-metrics",
@@ -490,27 +525,16 @@ class WorkerSourceTask extends WorkerTask {
                     "The average time in milliseconds taken by this task to poll for a batch of source records"),
                     new Avg());
 
-//            int buckets = 100;
-//            MetricName p99 = metricGroup.metricName("poll-batch-99p-time-ms",
-//                    "The 99th percentile time in milliseconds spent by this task to poll for a batch of source records");
-//            MetricName p95 = metricGroup.metricName("poll-batch-95p-time-ms",
-//                    "The 95th percentile time in milliseconds spent by this task to poll for a batch of source records");
-//            MetricName p90 = metricGroup.metricName("poll-batch-90p-time-ms",
-//                    "The 90th percentile time in milliseconds spent by this task to poll for a batch of source records");
-//            MetricName p75 = metricGroup.metricName("poll-batch-75p-time-ms",
-//                    "The 75th percentile time in milliseconds spent by this task to poll for a batch of source records");
-//            MetricName p50 = metricGroup.metricName("poll-batch-50p-time-ms",
-//                    "The 50th percentile (median) time in milliseconds spent by this task to poll for a batch of source records");
-//            Percentiles pollTimePercentiles = new Percentiles(4 * buckets,
-//                                                            0.0,
-//                                                            10*1000.0,
-//                                                            BucketSizing.LINEAR,
-//                                                            new Percentile(p50, 50),
-//                                                            new Percentile(p75, 75),
-//                                                            new Percentile(p90, 90),
-//                                                            new Percentile(p95, 95),
-//                                                            new Percentile(p99, 99));
-//            pollTime.add(pollTimePercentiles);
+            sourceRecordActiveCount = metricGroup.metrics().sensor("source-record-active-count");
+            sourceRecordActiveCount.add(metricGroup.metricName("source-record-active-count",
+                    "The number of records that have been produced by this task but not yet completely written to Kafka."),
+                    new Value());
+            sourceRecordActiveCount.add(metricGroup.metricName("source-record-active-count-max",
+                    "The maximum number of records that have been produced by this task but not yet completely written to Kafka."),
+                    new Max());
+            sourceRecordActiveCount.add(metricGroup.metricName("source-record-active-count-avg",
+                    "The average number of records that have been produced by this task but not yet completely written to Kafka."),
+                    new Avg());
         }
 
         void close() {
@@ -520,10 +544,15 @@ class WorkerSourceTask extends WorkerTask {
         void recordPoll(int batchSize, long duration) {
             sourceRecordPoll.record(batchSize);
             pollTime.record(duration);
+            activeRecordCount += batchSize;
+            sourceRecordActiveCount.record(activeRecordCount);
         }
 
         void recordWrite(int recordCount) {
             sourceRecordWrite.record(recordCount);
+            activeRecordCount -= recordCount;
+            activeRecordCount = Math.max(0, activeRecordCount);
+            sourceRecordActiveCount.record(activeRecordCount);
         }
 
         protected MetricGroup metricGroup() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/05357b70/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
index a4592b8..00be485 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
@@ -53,7 +53,11 @@ public class MockConnectMetrics extends ConnectMetrics {
     }
 
     public MockConnectMetrics() {
-        super("mock", new WorkerConfig(WorkerConfig.baseConfigDef(), DEFAULT_WORKER_CONFIG), new MockTime());
+        this(new MockTime());
+    }
+
+    public MockConnectMetrics(MockTime time) {
+        super("mock", new WorkerConfig(WorkerConfig.baseConfigDef(), DEFAULT_WORKER_CONFIG), time);
     }
 
     @Override
@@ -133,7 +137,8 @@ public class MockConnectMetrics extends ConnectMetrics {
          * @return the current value of the metric
          */
         public double currentMetricValue(MetricName metricName) {
-            return metricsByName.get(metricName).value();
+            KafkaMetric metric = metricsByName.get(metricName);
+            return metric != null ? metric.value() : Double.NaN;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/05357b70/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 5f3f888..290cdd0 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -26,10 +26,10 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.sink.SinkConnector;
@@ -107,7 +107,7 @@ public class WorkerSinkTaskTest {
 
     private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
     private TargetState initialState = TargetState.STARTED;
-    private Time time;
+    private MockTime time;
     private WorkerSinkTask workerTask;
     @Mock
     private SinkTask sinkTask;
@@ -144,7 +144,7 @@ public class WorkerSinkTaskTest {
         workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
         workerConfig = new StandaloneConfig(workerProps);
         pluginLoader = PowerMock.createMock(PluginClassLoader.class);
-        metrics = new MockConnectMetrics();
+        metrics = new MockConnectMetrics(time);
         recordsReturnedTp1 = 0;
         recordsReturnedTp3 = 0;
     }
@@ -177,6 +177,14 @@ public class WorkerSinkTaskTest {
         workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
         workerTask.iteration();
+        time.sleep(10000L);
+
+        assertSinkMetricValue("partition-count", 2);
+        assertTaskMetricValue("status-running", 0.0);
+        assertTaskMetricValue("status-paused", 1.0);
+        assertTaskMetricValue("running-ratio", 0.0);
+        assertTaskMetricValue("pause-ratio", 1.0);
+        assertTaskMetricValue("offset-commit-max-time-ms", Double.NEGATIVE_INFINITY);
 
         PowerMock.verifyAll();
     }
@@ -232,11 +240,47 @@ public class WorkerSinkTaskTest {
         workerTask.iteration(); // initial assignment
         workerTask.iteration(); // fetch some data
         workerTask.transitionTo(TargetState.PAUSED);
+        time.sleep(10000L);
+
+        assertSinkMetricValue("partition-count", 2);
+        assertSinkMetricValue("sink-record-read-total", 1.0);
+        assertSinkMetricValue("sink-record-send-total", 1.0);
+        assertSinkMetricValue("sink-record-active-count", 1.0);
+        assertSinkMetricValue("sink-record-active-count-max", 1.0);
+        assertSinkMetricValue("sink-record-active-count-avg", 0.333333);
+        assertSinkMetricValue("offset-commit-seq-no", 0.0);
+        assertSinkMetricValue("offset-commit-completion-rate", 0.0);
+        assertSinkMetricValue("offset-commit-completion-total", 0.0);
+        assertSinkMetricValue("offset-commit-completion-skip-rate", 0.0);
+        assertSinkMetricValue("offset-commit-completion-skip-total", 0.0);
+        assertTaskMetricValue("status-running", 1.0);
+        assertTaskMetricValue("status-paused", 0.0);
+        assertTaskMetricValue("running-ratio", 1.0);
+        assertTaskMetricValue("pause-ratio", 0.0);
+        assertTaskMetricValue("batch-size-max", 1.0);
+        assertTaskMetricValue("batch-size-avg", 0.5);
+        assertTaskMetricValue("offset-commit-max-time-ms", Double.NEGATIVE_INFINITY);
+        assertTaskMetricValue("offset-commit-failure-percentage", 0.0);
+        assertTaskMetricValue("offset-commit-success-percentage", 0.0);
+
         workerTask.iteration(); // wakeup
         workerTask.iteration(); // now paused
+        time.sleep(30000L);
+
+        assertSinkMetricValue("offset-commit-seq-no", 1.0);
+        assertSinkMetricValue("offset-commit-completion-rate", 0.0333);
+        assertSinkMetricValue("offset-commit-completion-total", 1.0);
+        assertSinkMetricValue("offset-commit-completion-skip-rate", 0.0);
+        assertSinkMetricValue("offset-commit-completion-skip-total", 0.0);
+        assertTaskMetricValue("status-running", 0.0);
+        assertTaskMetricValue("status-paused", 1.0);
+        assertTaskMetricValue("running-ratio", 0.25);
+        assertTaskMetricValue("pause-ratio", 0.75);
+
         workerTask.transitionTo(TargetState.STARTED);
         workerTask.iteration(); // wakeup
         workerTask.iteration(); // now unpaused
+        //printMetrics();
 
         PowerMock.verifyAll();
     }
@@ -276,8 +320,43 @@ public class WorkerSinkTaskTest {
         workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
         workerTask.iteration();
+        time.sleep(10000L);
+
+        assertSinkMetricValue("partition-count", 2);
+        assertSinkMetricValue("sink-record-read-total", 0.0);
+        assertSinkMetricValue("sink-record-send-total", 0.0);
+        assertSinkMetricValue("sink-record-active-count", 0.0);
+        assertSinkMetricValue("sink-record-active-count-max", 0.0);
+        assertSinkMetricValue("sink-record-active-count-avg", 0.0);
+        assertSinkMetricValue("offset-commit-seq-no", 0.0);
+        assertSinkMetricValue("offset-commit-completion-rate", 0.0);
+        assertSinkMetricValue("offset-commit-completion-total", 0.0);
+        assertSinkMetricValue("offset-commit-completion-skip-rate", 0.0);
+        assertSinkMetricValue("offset-commit-completion-skip-total", 0.0);
+        assertTaskMetricValue("status-running", 1.0);
+        assertTaskMetricValue("status-paused", 0.0);
+        assertTaskMetricValue("running-ratio", 1.0);
+        assertTaskMetricValue("pause-ratio", 0.0);
+        assertTaskMetricValue("batch-size-max", 0.0);
+        assertTaskMetricValue("batch-size-avg", 0.0);
+        assertTaskMetricValue("offset-commit-max-time-ms", Double.NEGATIVE_INFINITY);
+        assertTaskMetricValue("offset-commit-failure-percentage", 0.0);
+        assertTaskMetricValue("offset-commit-success-percentage", 0.0);
+
         workerTask.iteration();
         workerTask.iteration();
+        time.sleep(30000L);
+
+        assertSinkMetricValue("sink-record-read-total", 1.0);
+        assertSinkMetricValue("sink-record-send-total", 1.0);
+        assertSinkMetricValue("sink-record-active-count", 1.0);
+        assertSinkMetricValue("sink-record-active-count-max", 1.0);
+        assertSinkMetricValue("sink-record-active-count-avg", 0.5);
+        assertTaskMetricValue("status-running", 1.0);
+        assertTaskMetricValue("status-paused", 0.0);
+        assertTaskMetricValue("running-ratio", 1.0);
+        assertTaskMetricValue("batch-size-max", 1.0);
+        assertTaskMetricValue("batch-size-avg", 0.5);
 
         PowerMock.verifyAll();
     }
@@ -394,10 +473,35 @@ public class WorkerSinkTaskTest {
         PowerMock.replayAll();
 
         workerTask.initialize(TASK_CONFIG);
+        time.sleep(30000L);
         workerTask.initializeAndStart();
+        time.sleep(30000L);
+
         workerTask.iteration(); // poll for initial assignment
+        time.sleep(30000L);
         workerTask.iteration(); // first record delivered
         workerTask.iteration(); // now rebalance with the wakeup triggered
+        time.sleep(30000L);
+
+        assertSinkMetricValue("partition-count", 2);
+        assertSinkMetricValue("sink-record-read-total", 1.0);
+        assertSinkMetricValue("sink-record-send-total", 1.0);
+        assertSinkMetricValue("sink-record-active-count", 0.0);
+        assertSinkMetricValue("sink-record-active-count-max", 1.0);
+        assertSinkMetricValue("sink-record-active-count-avg", 0.33333);
+        assertSinkMetricValue("offset-commit-seq-no", 1.0);
+        assertSinkMetricValue("offset-commit-completion-total", 1.0);
+        assertSinkMetricValue("offset-commit-completion-skip-total", 0.0);
+        assertTaskMetricValue("status-running", 1.0);
+        assertTaskMetricValue("status-paused", 0.0);
+        assertTaskMetricValue("running-ratio", 1.0);
+        assertTaskMetricValue("pause-ratio", 0.0);
+        assertTaskMetricValue("batch-size-max", 1.0);
+        assertTaskMetricValue("batch-size-avg", 1.0);
+        assertTaskMetricValue("offset-commit-max-time-ms", 0.0);
+        assertTaskMetricValue("offset-commit-avg-time-ms", 0.0);
+        assertTaskMetricValue("offset-commit-failure-percentage", 0.0);
+        assertTaskMetricValue("offset-commit-success-percentage", 1.0);
 
         PowerMock.verifyAll();
     }
@@ -440,18 +544,61 @@ public class WorkerSinkTaskTest {
         workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
 
-        workerTask.iteration(); // initial assignment
+        // Initial assignment
+        time.sleep(30000L);
+        workerTask.iteration();
+        assertSinkMetricValue("partition-count", 2);
 
-        workerTask.iteration(); // first record delivered
+        // First record delivered
+        workerTask.iteration();
+        assertSinkMetricValue("partition-count", 2);
+        assertSinkMetricValue("sink-record-read-total", 1.0);
+        assertSinkMetricValue("sink-record-send-total", 1.0);
+        assertSinkMetricValue("sink-record-active-count", 1.0);
+        assertSinkMetricValue("sink-record-active-count-max", 1.0);
+        assertSinkMetricValue("sink-record-active-count-avg", 0.333333);
+        assertSinkMetricValue("offset-commit-seq-no", 0.0);
+        assertSinkMetricValue("offset-commit-completion-total", 0.0);
+        assertSinkMetricValue("offset-commit-completion-skip-total", 0.0);
+        assertTaskMetricValue("status-running", 1.0);
+        assertTaskMetricValue("status-paused", 0.0);
+        assertTaskMetricValue("running-ratio", 1.0);
+        assertTaskMetricValue("pause-ratio", 0.0);
+        assertTaskMetricValue("batch-size-max", 1.0);
+        assertTaskMetricValue("batch-size-avg", 0.5);
+        assertTaskMetricValue("offset-commit-failure-percentage", 0.0);
+        assertTaskMetricValue("offset-commit-success-percentage", 0.0);
 
         sinkTaskContext.getValue().requestCommit();
         assertTrue(sinkTaskContext.getValue().isCommitRequested());
         assertNotEquals(offsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
+        time.sleep(10000L);
         workerTask.iteration(); // triggers the commit
+        time.sleep(10000L);
         assertFalse(sinkTaskContext.getValue().isCommitRequested()); // should have been cleared
         assertEquals(offsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
         assertEquals(0, workerTask.commitFailures());
 
+        assertSinkMetricValue("partition-count", 2);
+        assertSinkMetricValue("sink-record-read-total", 1.0);
+        assertSinkMetricValue("sink-record-send-total", 1.0);
+        assertSinkMetricValue("sink-record-active-count", 0.0);
+        assertSinkMetricValue("sink-record-active-count-max", 1.0);
+        assertSinkMetricValue("sink-record-active-count-avg", 0.2);
+        assertSinkMetricValue("offset-commit-seq-no", 1.0);
+        assertSinkMetricValue("offset-commit-completion-total", 1.0);
+        assertSinkMetricValue("offset-commit-completion-skip-total", 0.0);
+        assertTaskMetricValue("status-running", 1.0);
+        assertTaskMetricValue("status-paused", 0.0);
+        assertTaskMetricValue("running-ratio", 1.0);
+        assertTaskMetricValue("pause-ratio", 0.0);
+        assertTaskMetricValue("batch-size-max", 1.0);
+        assertTaskMetricValue("batch-size-avg", 0.33333);
+        assertTaskMetricValue("offset-commit-max-time-ms", 0.0);
+        assertTaskMetricValue("offset-commit-avg-time-ms", 0.0);
+        assertTaskMetricValue("offset-commit-failure-percentage", 0.0);
+        assertTaskMetricValue("offset-commit-success-percentage", 1.0);
+
         PowerMock.verifyAll();
     }
 
@@ -835,6 +982,26 @@ public class WorkerSinkTaskTest {
         sinkTaskContext.getValue().requestCommit();
         workerTask.iteration(); // iter 3 -- commit in progress
 
+        assertSinkMetricValue("partition-count", 3);
+        assertSinkMetricValue("sink-record-read-total", 3.0);
+        assertSinkMetricValue("sink-record-send-total", 3.0);
+        assertSinkMetricValue("sink-record-active-count", 4.0);
+        assertSinkMetricValue("sink-record-active-count-max", 4.0);
+        assertSinkMetricValue("sink-record-active-count-avg", 0.71429);
+        assertSinkMetricValue("offset-commit-seq-no", 2.0);
+        assertSinkMetricValue("offset-commit-completion-total", 1.0);
+        assertSinkMetricValue("offset-commit-completion-skip-total", 1.0);
+        assertTaskMetricValue("status-running", 1.0);
+        assertTaskMetricValue("status-paused", 0.0);
+        assertTaskMetricValue("running-ratio", 1.0);
+        assertTaskMetricValue("pause-ratio", 0.0);
+        assertTaskMetricValue("batch-size-max", 2.0);
+        assertTaskMetricValue("batch-size-avg", 1.0);
+        assertTaskMetricValue("offset-commit-max-time-ms", 0.0);
+        assertTaskMetricValue("offset-commit-avg-time-ms", 0.0);
+        assertTaskMetricValue("offset-commit-failure-percentage", 0.0);
+        assertTaskMetricValue("offset-commit-success-percentage", 1.0);
+
         assertTrue(asyncCallbackRan.get());
         assertTrue(rebalanced.get());
 
@@ -850,6 +1017,26 @@ public class WorkerSinkTaskTest {
         assertEquals(postRebalanceCurrentOffsets, Whitebox.getInternalState(workerTask, "currentOffsets"));
         assertEquals(postRebalanceCurrentOffsets, Whitebox.getInternalState(workerTask, "lastCommittedOffsets"));
 
+        assertSinkMetricValue("partition-count", 3);
+        assertSinkMetricValue("sink-record-read-total", 4.0);
+        assertSinkMetricValue("sink-record-send-total", 4.0);
+        assertSinkMetricValue("sink-record-active-count", 0.0);
+        assertSinkMetricValue("sink-record-active-count-max", 4.0);
+        assertSinkMetricValue("sink-record-active-count-avg", 0.5555555);
+        assertSinkMetricValue("offset-commit-seq-no", 3.0);
+        assertSinkMetricValue("offset-commit-completion-total", 2.0);
+        assertSinkMetricValue("offset-commit-completion-skip-total", 1.0);
+        assertTaskMetricValue("status-running", 1.0);
+        assertTaskMetricValue("status-paused", 0.0);
+        assertTaskMetricValue("running-ratio", 1.0);
+        assertTaskMetricValue("pause-ratio", 0.0);
+        assertTaskMetricValue("batch-size-max", 2.0);
+        assertTaskMetricValue("batch-size-avg", 1.0);
+        assertTaskMetricValue("offset-commit-max-time-ms", 0.0);
+        assertTaskMetricValue("offset-commit-avg-time-ms", 0.0);
+        assertTaskMetricValue("offset-commit-failure-percentage", 0.0);
+        assertTaskMetricValue("offset-commit-success-percentage", 1.0);
+
         PowerMock.verifyAll();
     }
 
@@ -1096,6 +1283,76 @@ public class WorkerSinkTaskTest {
                 }).times(numMessages);
     }
 
+
+    private void assertSinkMetricValue(String name, double expected) {
+        MetricGroup sinkTaskGroup = workerTask.sinkTaskMetricsGroup().metricGroup();
+        double measured = metrics.currentMetricValue(sinkTaskGroup, name);
+        assertEquals(expected, measured, 0.001d);
+    }
+
+    private void assertTaskMetricValue(String name, double expected) {
+        MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup();
+        double measured = metrics.currentMetricValue(taskGroup, name);
+        assertEquals(expected, measured, 0.001d);
+    }
+
+    private void printMetrics() {
+        System.out.println("");
+        sinkMetricValue("sink-record-read-rate");
+        sinkMetricValue("sink-record-read-total");
+        sinkMetricValue("sink-record-send-rate");
+        sinkMetricValue("sink-record-send-total");
+        sinkMetricValue("sink-record-active-count");
+        sinkMetricValue("sink-record-active-count-max");
+        sinkMetricValue("sink-record-active-count-avg");
+        sinkMetricValue("partition-count");
+        sinkMetricValue("offset-commit-seq-no");
+        sinkMetricValue("offset-commit-completion-rate");
+        sinkMetricValue("offset-commit-completion-total");
+        sinkMetricValue("offset-commit-completion-skip-rate");
+        sinkMetricValue("offset-commit-completion-skip-total");
+        sinkMetricValue("put-batch-max-time-ms");
+        sinkMetricValue("put-batch-avg-time-ms");
+
+        taskMetricValue("status-unassigned");
+        taskMetricValue("status-running");
+        taskMetricValue("status-paused");
+        taskMetricValue("status-failed");
+        taskMetricValue("status-destroyed");
+        taskMetricValue("running-ratio");
+        taskMetricValue("pause-ratio");
+        taskMetricValue("offset-commit-max-time-ms");
+        taskMetricValue("offset-commit-avg-time-ms");
+        taskMetricValue("batch-size-max");
+        taskMetricValue("batch-size-avg");
+        taskMetricValue("offset-commit-failure-percentage");
+        taskMetricValue("offset-commit-success-percentage");
+    }
+
+    private double sinkMetricValue(String metricName) {
+        MetricGroup sinkTaskGroup = workerTask.sinkTaskMetricsGroup().metricGroup();
+        double value = metrics.currentMetricValue(sinkTaskGroup, metricName);
+        System.out.println("** " + metricName + "=" + value);
+        return value;
+    }
+
+    private double taskMetricValue(String metricName) {
+        MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup();
+        double value = metrics.currentMetricValue(taskGroup, metricName);
+        System.out.println("** " + metricName + "=" + value);
+        return value;
+    }
+
+
+    private void assertMetrics(int minimumPollCountExpected) {
+        MetricGroup sinkTaskGroup = workerTask.sinkTaskMetricsGroup().metricGroup();
+        MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup();
+        double readRate = metrics.currentMetricValue(sinkTaskGroup, "sink-record-read-rate");
+        double readTotal = metrics.currentMetricValue(sinkTaskGroup, "sink-record-read-total");
+        double sendRate = metrics.currentMetricValue(sinkTaskGroup, "sink-record-send-rate");
+        double sendTotal = metrics.currentMetricValue(sinkTaskGroup, "sink-record-send-total");
+    }
+
     private abstract static class TestSinkTask extends SinkTask  {
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/05357b70/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 8a23ad5..d1e0290 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -820,6 +820,12 @@ public class WorkerSourceTaskTest extends ThreadedTest {
             assertTrue(pollBatchTimeMax >= 0.0d);
         }
         assertTrue(pollBatchTimeAvg >= 0.0d);
+        double activeCount = metrics.currentMetricValue(sourceTaskGroup, "source-record-active-count");
+        double activeCountMax = metrics.currentMetricValue(sourceTaskGroup, "source-record-active-count-max");
+        assertEquals(0, activeCount, 0.000001d);
+        if (minimumPollCountExpected > 0) {
+            assertEquals(RECORDS.size(), activeCountMax, 0.000001d);
+        }
     }
 
     private abstract static class TestSourceTask extends SourceTask {