You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/02/06 17:18:41 UTC

[kafka] branch trunk updated: KAFKA-6504: Ensure uniqueness of connect task metric sensor creation (#4514)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7d70a42  KAFKA-6504: Ensure uniqueness of connect task metric sensor creation (#4514)
7d70a42 is described below

commit 7d70a427b2689c0a1de4345db4b04ea557b4c27b
Author: Robert Yokota <ra...@gmail.com>
AuthorDate: Tue Feb 6 09:18:37 2018 -0800

    KAFKA-6504: Ensure uniqueness of connect task metric sensor creation (#4514)
    
    This change ensures that when sensors are created, they are unique to the metric group associated with the task that created them. Previously the sensors were being shared between task metric groups, causing incorrect metrics.
    
    Reviewers: Randall Hauch <rh...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 .../kafka/connect/runtime/WorkerSinkTask.java      | 16 +++---
 .../kafka/connect/runtime/WorkerSourceTask.java    |  2 +-
 .../kafka/connect/runtime/WorkerSinkTaskTest.java  | 65 ++++++++++++++++++++++
 .../connect/runtime/WorkerSourceTaskTest.java      | 34 +++++++++++
 4 files changed, 108 insertions(+), 9 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 5aeb851..2995a4e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -684,34 +684,34 @@ class WorkerSinkTask extends WorkerTask {
             // prevent collisions by removing any previously created metrics in this group.
             metricGroup.close();
 
-            sinkRecordRead = metricGroup.metrics().sensor("sink-record-read");
+            sinkRecordRead = metricGroup.sensor("sink-record-read");
             sinkRecordRead.add(metricGroup.metricName(registry.sinkRecordReadRate), new Rate());
             sinkRecordRead.add(metricGroup.metricName(registry.sinkRecordReadTotal), new Total());
 
-            sinkRecordSend = metricGroup.metrics().sensor("sink-record-send");
+            sinkRecordSend = metricGroup.sensor("sink-record-send");
             sinkRecordSend.add(metricGroup.metricName(registry.sinkRecordSendRate), new Rate());
             sinkRecordSend.add(metricGroup.metricName(registry.sinkRecordSendTotal), new Total());
 
-            sinkRecordActiveCount = metricGroup.metrics().sensor("sink-record-active-count");
+            sinkRecordActiveCount = metricGroup.sensor("sink-record-active-count");
             sinkRecordActiveCount.add(metricGroup.metricName(registry.sinkRecordActiveCount), new Value());
             sinkRecordActiveCount.add(metricGroup.metricName(registry.sinkRecordActiveCountMax), new Max());
             sinkRecordActiveCount.add(metricGroup.metricName(registry.sinkRecordActiveCountAvg), new Avg());
 
-            partitionCount = metricGroup.metrics().sensor("partition-count");
+            partitionCount = metricGroup.sensor("partition-count");
             partitionCount.add(metricGroup.metricName(registry.sinkRecordPartitionCount), new Value());
 
-            offsetSeqNum = metricGroup.metrics().sensor("offset-seq-number");
+            offsetSeqNum = metricGroup.sensor("offset-seq-number");
             offsetSeqNum.add(metricGroup.metricName(registry.sinkRecordOffsetCommitSeqNum), new Value());
 
-            offsetCompletion = metricGroup.metrics().sensor("offset-commit-completion");
+            offsetCompletion = metricGroup.sensor("offset-commit-completion");
             offsetCompletion.add(metricGroup.metricName(registry.sinkRecordOffsetCommitCompletionRate), new Rate());
             offsetCompletion.add(metricGroup.metricName(registry.sinkRecordOffsetCommitCompletionTotal), new Total());
 
-            offsetCompletionSkip = metricGroup.metrics().sensor("offset-commit-completion-skip");
+            offsetCompletionSkip = metricGroup.sensor("offset-commit-completion-skip");
             offsetCompletionSkip.add(metricGroup.metricName(registry.sinkRecordOffsetCommitSkipRate), new Rate());
             offsetCompletionSkip.add(metricGroup.metricName(registry.sinkRecordOffsetCommitSkipTotal), new Total());
 
-            putBatchTime = metricGroup.metrics().sensor("put-batch-time");
+            putBatchTime = metricGroup.sensor("put-batch-time");
             putBatchTime.add(metricGroup.metricName(registry.sinkRecordPutBatchTimeMax), new Max());
             putBatchTime.add(metricGroup.metricName(registry.sinkRecordPutBatchTimeAvg), new Avg());
         }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 6288767..6ef5ae3 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -531,7 +531,7 @@ class WorkerSourceTask extends WorkerTask {
             pollTime.add(metricGroup.metricName(registry.sourceRecordPollBatchTimeMax), new Max());
             pollTime.add(metricGroup.metricName(registry.sourceRecordPollBatchTimeAvg), new Avg());
 
-            sourceRecordActiveCount = metricGroup.metrics().sensor("sink-record-active-count");
+            sourceRecordActiveCount = metricGroup.sensor("source-record-active-count");
             sourceRecordActiveCount.add(metricGroup.metricName(registry.sourceRecordActiveCount), new Value());
             sourceRecordActiveCount.add(metricGroup.metricName(registry.sourceRecordActiveCountMax), new Max());
             sourceRecordActiveCount.add(metricGroup.metricName(registry.sourceRecordActiveCountAvg), new Avg());
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 4ff4248..9568e78 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.record.RecordBatch;
@@ -32,6 +33,7 @@ import org.apache.kafka.connect.errors.RetriableException;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.runtime.WorkerSinkTask.SinkTaskMetricsGroup;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
@@ -75,6 +77,7 @@ import static java.util.Collections.singleton;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -108,6 +111,7 @@ public class WorkerSinkTaskTest {
     private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS);
 
     private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+    private ConnectorTaskId taskId1 = new ConnectorTaskId("job", 1);
     private TargetState initialState = TargetState.STARTED;
     private MockTime time;
     private WorkerSinkTask workerTask;
@@ -1183,6 +1187,67 @@ public class WorkerSinkTaskTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testMetricsGroup() {
+        SinkTaskMetricsGroup group = new SinkTaskMetricsGroup(taskId, metrics);
+        SinkTaskMetricsGroup group1 = new SinkTaskMetricsGroup(taskId1, metrics);
+        for (int i = 0; i != 10; ++i) {
+            group.recordRead(1);
+            group.recordSend(2);
+            group.recordPut(3);
+            group.recordPartitionCount(4);
+            group.recordOffsetSequenceNumber(5);
+        }
+        Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
+        committedOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
+        group.recordCommittedOffsets(committedOffsets);
+        Map<TopicPartition, OffsetAndMetadata> consumedOffsets = new HashMap<>();
+        consumedOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 10));
+        group.recordConsumedOffsets(consumedOffsets);
+
+        for (int i = 0; i != 20; ++i) {
+            group1.recordRead(1);
+            group1.recordSend(2);
+            group1.recordPut(30);
+            group1.recordPartitionCount(40);
+            group1.recordOffsetSequenceNumber(50);
+        }
+        committedOffsets = new HashMap<>();
+        committedOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET + 2));
+        committedOffsets.put(TOPIC_PARTITION3, new OffsetAndMetadata(FIRST_OFFSET + 3));
+        group1.recordCommittedOffsets(committedOffsets);
+        consumedOffsets = new HashMap<>();
+        consumedOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET + 20));
+        consumedOffsets.put(TOPIC_PARTITION3, new OffsetAndMetadata(FIRST_OFFSET + 30));
+        group1.recordConsumedOffsets(consumedOffsets);
+
+        assertEquals(0.333, metrics.currentMetricValueAsDouble(group.metricGroup(), "sink-record-read-rate"), 0.001d);
+        assertEquals(0.667, metrics.currentMetricValueAsDouble(group.metricGroup(), "sink-record-send-rate"), 0.001d);
+        assertEquals(9, metrics.currentMetricValueAsDouble(group.metricGroup(), "sink-record-active-count"), 0.001d);
+        assertEquals(4, metrics.currentMetricValueAsDouble(group.metricGroup(), "partition-count"), 0.001d);
+        assertEquals(5, metrics.currentMetricValueAsDouble(group.metricGroup(), "offset-commit-seq-no"), 0.001d);
+        assertEquals(3, metrics.currentMetricValueAsDouble(group.metricGroup(), "put-batch-max-time-ms"), 0.001d);
+
+        // Close the group
+        group.close();
+
+        for (MetricName metricName : group.metricGroup().metrics().metrics().keySet()) {
+            // Metrics for this group should no longer exist
+            assertFalse(group.metricGroup().groupId().includes(metricName));
+        }
+        // Sensors for this group should no longer exist
+        assertNull(group.metricGroup().metrics().getSensor("source-record-poll"));
+        assertNull(group.metricGroup().metrics().getSensor("source-record-write"));
+        assertNull(group.metricGroup().metrics().getSensor("poll-batch-time"));
+
+        assertEquals(0.667, metrics.currentMetricValueAsDouble(group1.metricGroup(), "sink-record-read-rate"), 0.001d);
+        assertEquals(1.333, metrics.currentMetricValueAsDouble(group1.metricGroup(), "sink-record-send-rate"), 0.001d);
+        assertEquals(45, metrics.currentMetricValueAsDouble(group1.metricGroup(), "sink-record-active-count"), 0.001d);
+        assertEquals(40, metrics.currentMetricValueAsDouble(group1.metricGroup(), "partition-count"), 0.001d);
+        assertEquals(50, metrics.currentMetricValueAsDouble(group1.metricGroup(), "offset-commit-seq-no"), 0.001d);
+        assertEquals(30, metrics.currentMetricValueAsDouble(group1.metricGroup(), "put-batch-max-time-ms"), 0.001d);
+    }
+
     private void expectInitializeTask() throws Exception {
         PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer);
         consumer.subscribe(EasyMock.eq(asList(TOPIC)), EasyMock.capture(rebalanceListener));
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 266c6ad..25c2cb1 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.runtime;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.record.InvalidRecordException;
 import org.apache.kafka.common.utils.Time;
@@ -66,6 +67,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
@@ -88,6 +90,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
     private ExecutorService executor = Executors.newSingleThreadExecutor();
     private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+    private ConnectorTaskId taskId1 = new ConnectorTaskId("job", 1);
     private WorkerConfig config;
     private Plugins plugins;
     private MockConnectMetrics metrics;
@@ -613,16 +616,47 @@ public class WorkerSourceTaskTest extends ThreadedTest {
     @Test
     public void testMetricsGroup() {
         SourceTaskMetricsGroup group = new SourceTaskMetricsGroup(taskId, metrics);
+        SourceTaskMetricsGroup group1 = new SourceTaskMetricsGroup(taskId1, metrics);
         for (int i = 0; i != 10; ++i) {
             group.recordPoll(100, 1000 + i * 100);
             group.recordWrite(10);
         }
+        for (int i = 0; i != 20; ++i) {
+            group1.recordPoll(100, 1000 + i * 100);
+            group1.recordWrite(10);
+        }
         assertEquals(1900.0, metrics.currentMetricValueAsDouble(group.metricGroup(), "poll-batch-max-time-ms"), 0.001d);
         assertEquals(1450.0, metrics.currentMetricValueAsDouble(group.metricGroup(), "poll-batch-avg-time-ms"), 0.001d);
         assertEquals(33.333, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-poll-rate"), 0.001d);
         assertEquals(1000, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-poll-total"), 0.001d);
         assertEquals(3.3333, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-write-rate"), 0.001d);
         assertEquals(100, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-write-total"), 0.001d);
+        assertEquals(900.0, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-active-count"), 0.001d);
+
+        // Close the group
+        group.close();
+
+        for (MetricName metricName : group.metricGroup().metrics().metrics().keySet()) {
+            // Metrics for this group should no longer exist
+            assertFalse(group.metricGroup().groupId().includes(metricName));
+        }
+        // Sensors for this group should no longer exist
+        assertNull(group.metricGroup().metrics().getSensor("sink-record-read"));
+        assertNull(group.metricGroup().metrics().getSensor("sink-record-send"));
+        assertNull(group.metricGroup().metrics().getSensor("sink-record-active-count"));
+        assertNull(group.metricGroup().metrics().getSensor("partition-count"));
+        assertNull(group.metricGroup().metrics().getSensor("offset-seq-number"));
+        assertNull(group.metricGroup().metrics().getSensor("offset-commit-completion"));
+        assertNull(group.metricGroup().metrics().getSensor("offset-commit-completion-skip"));
+        assertNull(group.metricGroup().metrics().getSensor("put-batch-time"));
+
+        assertEquals(2900.0, metrics.currentMetricValueAsDouble(group1.metricGroup(), "poll-batch-max-time-ms"), 0.001d);
+        assertEquals(1950.0, metrics.currentMetricValueAsDouble(group1.metricGroup(), "poll-batch-avg-time-ms"), 0.001d);
+        assertEquals(66.667, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-poll-rate"), 0.001d);
+        assertEquals(2000, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-poll-total"), 0.001d);
+        assertEquals(6.667, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-write-rate"), 0.001d);
+        assertEquals(200, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-write-total"), 0.001d);
+        assertEquals(1800.0, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-active-count"), 0.001d);
     }
 
     private CountDownLatch expectPolls(int minimum, final AtomicInteger count) throws InterruptedException {

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.