You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/01/02 17:34:34 UTC

[kafka] branch trunk updated: KAFKA-5368: Add test for skipped-records metric (#4365)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 39f2d45  KAFKA-5368: Add test for skipped-records metric (#4365)
39f2d45 is described below

commit 39f2d4588fb1bf82748fa4c1025b7452ca1ff5ba
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Tue Jan 2 09:34:31 2018 -0800

    KAFKA-5368: Add test for skipped-records metric (#4365)
    
    * KAFKA-5368: Add test for skipped-records metric
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../processor/internals/StreamThreadTest.java      | 48 ++++++++++++++++++++++
 1 file changed, 48 insertions(+)

diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index fca380f..cca7045 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -23,10 +23,12 @@ import org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
@@ -37,6 +39,7 @@ import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
 import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
 import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
+import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TaskMetadata;
 import org.apache.kafka.streams.processor.ThreadMetadata;
@@ -864,6 +867,51 @@ public class StreamThreadTest {
         }
     }
 
+    @Test
+    public void shouldReportSkippedRecordsForInvalidTimestamps() throws Exception {
+        internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
+
+        final Properties config = configProps(false);
+        config.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, LogAndSkipOnInvalidTimestamp.class.getName());
+        final StreamThread thread = createStreamThread(clientId, new StreamsConfig(config), false);
+
+        thread.setState(StreamThread.State.RUNNING);
+        thread.setState(StreamThread.State.PARTITIONS_REVOKED);
+
+        final Set<TopicPartition> assignedPartitions = Collections.singleton(new TopicPartition(t1p1.topic(), t1p1.partition()));
+        thread.taskManager().setAssignmentMetadata(
+            Collections.singletonMap(
+                new TaskId(0, t1p1.partition()),
+                assignedPartitions),
+            Collections.<TaskId, Set<TopicPartition>>emptyMap());
+        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
+
+        final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
+        mockConsumer.assign(Collections.singleton(t1p1));
+        mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
+
+        final MetricName skippedTotalMetric = metrics.metricName("skipped-records-total", "stream-metrics", Collections.singletonMap("client-id", thread.getName()));
+        assertEquals(0.0, metrics.metric(skippedTotalMetric).metricValue());
+
+        long offset = -1;
+        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
+        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
+        thread.runOnce(-1);
+        assertEquals(2.0, metrics.metric(skippedTotalMetric).metricValue());
+
+        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
+        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
+        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
+        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
+        thread.runOnce(-1);
+        assertEquals(6.0, metrics.metric(skippedTotalMetric).metricValue());
+
+        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, 1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
+        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, 1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
+        thread.runOnce(-1);
+        assertEquals(6.0, metrics.metric(skippedTotalMetric).metricValue());
+    }
+
     private void assertThreadMetadataHasEmptyTasksWithState(ThreadMetadata metadata, StreamThread.State state) {
         assertEquals(state.name(), metadata.threadState());
         assertTrue(metadata.activeTasks().isEmpty());

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <co...@kafka.apache.org>'].