You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/02/06 17:50:31 UTC

[kafka] branch 1.0 updated: KAFKA-6504: Ensure uniqueness of connect task metric sensor creation (#4514)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new 375e3fc  KAFKA-6504: Ensure uniqueness of connect task metric sensor creation (#4514)
375e3fc is described below

commit 375e3fce9a1978c017d210ce0c1225f78c6d8724
Author: Robert Yokota <ra...@gmail.com>
AuthorDate: Tue Feb 6 09:18:37 2018 -0800

    KAFKA-6504: Ensure uniqueness of connect task metric sensor creation (#4514)
    
    This change ensures that when sensors are created, they are unique to the metric group associated with the task that created them. Previously the sensors were being shared between task metric groups, causing incorrect metrics.
    
    Reviewers: Randall Hauch <rh...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 .../kafka/connect/runtime/WorkerSinkTask.java      | 16 +++---
 .../kafka/connect/runtime/WorkerSourceTask.java    |  2 +-
 .../kafka/connect/runtime/WorkerSinkTaskTest.java  | 65 ++++++++++++++++++++++
 .../connect/runtime/WorkerSourceTaskTest.java      | 34 +++++++++++
 4 files changed, 108 insertions(+), 9 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 587e4c6..0feda02 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -655,34 +655,34 @@ class WorkerSinkTask extends WorkerTask {
             // prevent collisions by removing any previously created metrics in this group.
             metricGroup.close();
 
-            sinkRecordRead = metricGroup.metrics().sensor("sink-record-read");
+            sinkRecordRead = metricGroup.sensor("sink-record-read");
             sinkRecordRead.add(metricGroup.metricName(registry.sinkRecordReadRate), new Rate());
             sinkRecordRead.add(metricGroup.metricName(registry.sinkRecordReadTotal), new Total());
 
-            sinkRecordSend = metricGroup.metrics().sensor("sink-record-send");
+            sinkRecordSend = metricGroup.sensor("sink-record-send");
             sinkRecordSend.add(metricGroup.metricName(registry.sinkRecordSendRate), new Rate());
             sinkRecordSend.add(metricGroup.metricName(registry.sinkRecordSendTotal), new Total());
 
-            sinkRecordActiveCount = metricGroup.metrics().sensor("sink-record-active-count");
+            sinkRecordActiveCount = metricGroup.sensor("sink-record-active-count");
             sinkRecordActiveCount.add(metricGroup.metricName(registry.sinkRecordActiveCount), new Value());
             sinkRecordActiveCount.add(metricGroup.metricName(registry.sinkRecordActiveCountMax), new Max());
             sinkRecordActiveCount.add(metricGroup.metricName(registry.sinkRecordActiveCountAvg), new Avg());
 
-            partitionCount = metricGroup.metrics().sensor("partition-count");
+            partitionCount = metricGroup.sensor("partition-count");
             partitionCount.add(metricGroup.metricName(registry.sinkRecordPartitionCount), new Value());
 
-            offsetSeqNum = metricGroup.metrics().sensor("offset-seq-number");
+            offsetSeqNum = metricGroup.sensor("offset-seq-number");
             offsetSeqNum.add(metricGroup.metricName(registry.sinkRecordOffsetCommitSeqNum), new Value());
 
-            offsetCompletion = metricGroup.metrics().sensor("offset-commit-completion");
+            offsetCompletion = metricGroup.sensor("offset-commit-completion");
             offsetCompletion.add(metricGroup.metricName(registry.sinkRecordOffsetCommitCompletionRate), new Rate());
             offsetCompletion.add(metricGroup.metricName(registry.sinkRecordOffsetCommitCompletionTotal), new Total());
 
-            offsetCompletionSkip = metricGroup.metrics().sensor("offset-commit-completion-skip");
+            offsetCompletionSkip = metricGroup.sensor("offset-commit-completion-skip");
             offsetCompletionSkip.add(metricGroup.metricName(registry.sinkRecordOffsetCommitSkipRate), new Rate());
             offsetCompletionSkip.add(metricGroup.metricName(registry.sinkRecordOffsetCommitSkipTotal), new Total());
 
-            putBatchTime = metricGroup.metrics().sensor("put-batch-time");
+            putBatchTime = metricGroup.sensor("put-batch-time");
             putBatchTime.add(metricGroup.metricName(registry.sinkRecordPutBatchTimeMax), new Max());
             putBatchTime.add(metricGroup.metricName(registry.sinkRecordPutBatchTimeAvg), new Avg());
         }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index a172cdb..473e235 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -509,7 +509,7 @@ class WorkerSourceTask extends WorkerTask {
             pollTime.add(metricGroup.metricName(registry.sourceRecordPollBatchTimeMax), new Max());
             pollTime.add(metricGroup.metricName(registry.sourceRecordPollBatchTimeAvg), new Avg());
 
-            sourceRecordActiveCount = metricGroup.metrics().sensor("sink-record-active-count");
+            sourceRecordActiveCount = metricGroup.sensor("source-record-active-count");
             sourceRecordActiveCount.add(metricGroup.metricName(registry.sourceRecordActiveCount), new Value());
             sourceRecordActiveCount.add(metricGroup.metricName(registry.sourceRecordActiveCountMax), new Max());
             sourceRecordActiveCount.add(metricGroup.metricName(registry.sourceRecordActiveCountAvg), new Avg());
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 50b091d..913ef53 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.record.RecordBatch;
@@ -30,6 +31,7 @@ import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.RetriableException;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.WorkerSinkTask.SinkTaskMetricsGroup;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.sink.SinkConnector;
@@ -73,6 +75,7 @@ import static java.util.Collections.singleton;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -106,6 +109,7 @@ public class WorkerSinkTaskTest {
     private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS);
 
     private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+    private ConnectorTaskId taskId1 = new ConnectorTaskId("job", 1);
     private TargetState initialState = TargetState.STARTED;
     private MockTime time;
     private WorkerSinkTask workerTask;
@@ -1143,6 +1147,67 @@ public class WorkerSinkTaskTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testMetricsGroup() {
+        SinkTaskMetricsGroup group = new SinkTaskMetricsGroup(taskId, metrics);
+        SinkTaskMetricsGroup group1 = new SinkTaskMetricsGroup(taskId1, metrics);
+        for (int i = 0; i != 10; ++i) {
+            group.recordRead(1);
+            group.recordSend(2);
+            group.recordPut(3);
+            group.recordPartitionCount(4);
+            group.recordOffsetSequenceNumber(5);
+        }
+        Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
+        committedOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
+        group.recordCommittedOffsets(committedOffsets);
+        Map<TopicPartition, OffsetAndMetadata> consumedOffsets = new HashMap<>();
+        consumedOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 10));
+        group.recordConsumedOffsets(consumedOffsets);
+
+        for (int i = 0; i != 20; ++i) {
+            group1.recordRead(1);
+            group1.recordSend(2);
+            group1.recordPut(30);
+            group1.recordPartitionCount(40);
+            group1.recordOffsetSequenceNumber(50);
+        }
+        committedOffsets = new HashMap<>();
+        committedOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET + 2));
+        committedOffsets.put(TOPIC_PARTITION3, new OffsetAndMetadata(FIRST_OFFSET + 3));
+        group1.recordCommittedOffsets(committedOffsets);
+        consumedOffsets = new HashMap<>();
+        consumedOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET + 20));
+        consumedOffsets.put(TOPIC_PARTITION3, new OffsetAndMetadata(FIRST_OFFSET + 30));
+        group1.recordConsumedOffsets(consumedOffsets);
+
+        assertEquals(0.333, metrics.currentMetricValueAsDouble(group.metricGroup(), "sink-record-read-rate"), 0.001d);
+        assertEquals(0.667, metrics.currentMetricValueAsDouble(group.metricGroup(), "sink-record-send-rate"), 0.001d);
+        assertEquals(9, metrics.currentMetricValueAsDouble(group.metricGroup(), "sink-record-active-count"), 0.001d);
+        assertEquals(4, metrics.currentMetricValueAsDouble(group.metricGroup(), "partition-count"), 0.001d);
+        assertEquals(5, metrics.currentMetricValueAsDouble(group.metricGroup(), "offset-commit-seq-no"), 0.001d);
+        assertEquals(3, metrics.currentMetricValueAsDouble(group.metricGroup(), "put-batch-max-time-ms"), 0.001d);
+
+        // Close the group
+        group.close();
+
+        for (MetricName metricName : group.metricGroup().metrics().metrics().keySet()) {
+            // Metrics for this group should no longer exist
+            assertFalse(group.metricGroup().groupId().includes(metricName));
+        }
+        // Sensors for this group should no longer exist
+        assertNull(group.metricGroup().metrics().getSensor("source-record-poll"));
+        assertNull(group.metricGroup().metrics().getSensor("source-record-write"));
+        assertNull(group.metricGroup().metrics().getSensor("poll-batch-time"));
+
+        assertEquals(0.667, metrics.currentMetricValueAsDouble(group1.metricGroup(), "sink-record-read-rate"), 0.001d);
+        assertEquals(1.333, metrics.currentMetricValueAsDouble(group1.metricGroup(), "sink-record-send-rate"), 0.001d);
+        assertEquals(45, metrics.currentMetricValueAsDouble(group1.metricGroup(), "sink-record-active-count"), 0.001d);
+        assertEquals(40, metrics.currentMetricValueAsDouble(group1.metricGroup(), "partition-count"), 0.001d);
+        assertEquals(50, metrics.currentMetricValueAsDouble(group1.metricGroup(), "offset-commit-seq-no"), 0.001d);
+        assertEquals(30, metrics.currentMetricValueAsDouble(group1.metricGroup(), "put-batch-max-time-ms"), 0.001d);
+    }
+
     private void expectInitializeTask() throws Exception {
         PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer);
         consumer.subscribe(EasyMock.eq(asList(TOPIC)), EasyMock.capture(rebalanceListener));
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 4f0d243..dfdf415 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.runtime;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.record.InvalidRecordException;
 import org.apache.kafka.common.utils.Time;
@@ -65,6 +66,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
@@ -87,6 +89,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
     private ExecutorService executor = Executors.newSingleThreadExecutor();
     private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+    private ConnectorTaskId taskId1 = new ConnectorTaskId("job", 1);
     private WorkerConfig config;
     private Plugins plugins;
     private MockConnectMetrics metrics;
@@ -611,16 +614,47 @@ public class WorkerSourceTaskTest extends ThreadedTest {
     @Test
     public void testMetricsGroup() {
         SourceTaskMetricsGroup group = new SourceTaskMetricsGroup(taskId, metrics);
+        SourceTaskMetricsGroup group1 = new SourceTaskMetricsGroup(taskId1, metrics);
         for (int i = 0; i != 10; ++i) {
             group.recordPoll(100, 1000 + i * 100);
             group.recordWrite(10);
         }
+        for (int i = 0; i != 20; ++i) {
+            group1.recordPoll(100, 1000 + i * 100);
+            group1.recordWrite(10);
+        }
         assertEquals(1900.0, metrics.currentMetricValueAsDouble(group.metricGroup(), "poll-batch-max-time-ms"), 0.001d);
         assertEquals(1450.0, metrics.currentMetricValueAsDouble(group.metricGroup(), "poll-batch-avg-time-ms"), 0.001d);
         assertEquals(33.333, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-poll-rate"), 0.001d);
         assertEquals(1000, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-poll-total"), 0.001d);
         assertEquals(3.3333, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-write-rate"), 0.001d);
         assertEquals(100, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-write-total"), 0.001d);
+        assertEquals(900.0, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-active-count"), 0.001d);
+
+        // Close the group
+        group.close();
+
+        for (MetricName metricName : group.metricGroup().metrics().metrics().keySet()) {
+            // Metrics for this group should no longer exist
+            assertFalse(group.metricGroup().groupId().includes(metricName));
+        }
+        // Sensors for this group should no longer exist
+        assertNull(group.metricGroup().metrics().getSensor("sink-record-read"));
+        assertNull(group.metricGroup().metrics().getSensor("sink-record-send"));
+        assertNull(group.metricGroup().metrics().getSensor("sink-record-active-count"));
+        assertNull(group.metricGroup().metrics().getSensor("partition-count"));
+        assertNull(group.metricGroup().metrics().getSensor("offset-seq-number"));
+        assertNull(group.metricGroup().metrics().getSensor("offset-commit-completion"));
+        assertNull(group.metricGroup().metrics().getSensor("offset-commit-completion-skip"));
+        assertNull(group.metricGroup().metrics().getSensor("put-batch-time"));
+
+        assertEquals(2900.0, metrics.currentMetricValueAsDouble(group1.metricGroup(), "poll-batch-max-time-ms"), 0.001d);
+        assertEquals(1950.0, metrics.currentMetricValueAsDouble(group1.metricGroup(), "poll-batch-avg-time-ms"), 0.001d);
+        assertEquals(66.667, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-poll-rate"), 0.001d);
+        assertEquals(2000, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-poll-total"), 0.001d);
+        assertEquals(6.667, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-write-rate"), 0.001d);
+        assertEquals(200, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-write-total"), 0.001d);
+        assertEquals(1800.0, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-active-count"), 0.001d);
     }
 
     private CountDownLatch expectPolls(int minimum, final AtomicInteger count) throws InterruptedException {

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.