You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/01/02 17:34:34 UTC
[kafka] branch trunk updated: KAFKA-5368: Add test for
skipped-records metric (#4365)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 39f2d45 KAFKA-5368: Add test for skipped-records metric (#4365)
39f2d45 is described below
commit 39f2d4588fb1bf82748fa4c1025b7452ca1ff5ba
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Tue Jan 2 09:34:31 2018 -0800
KAFKA-5368: Add test for skipped-records metric (#4365)
* KAFKA-5368: Add test for skipped-records metric
Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
.../processor/internals/StreamThreadTest.java | 48 ++++++++++++++++++++++
1 file changed, 48 insertions(+)
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index fca380f..cca7045 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -23,10 +23,12 @@ import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
@@ -37,6 +39,7 @@ import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
+import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
@@ -864,6 +867,51 @@ public class StreamThreadTest {
}
}
+ @Test
+ public void shouldReportSkippedRecordsForInvalidTimestamps() throws Exception {
+ internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
+
+ final Properties config = configProps(false);
+ config.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, LogAndSkipOnInvalidTimestamp.class.getName());
+ final StreamThread thread = createStreamThread(clientId, new StreamsConfig(config), false);
+
+ thread.setState(StreamThread.State.RUNNING);
+ thread.setState(StreamThread.State.PARTITIONS_REVOKED);
+
+ final Set<TopicPartition> assignedPartitions = Collections.singleton(new TopicPartition(t1p1.topic(), t1p1.partition()));
+ thread.taskManager().setAssignmentMetadata(
+ Collections.singletonMap(
+ new TaskId(0, t1p1.partition()),
+ assignedPartitions),
+ Collections.<TaskId, Set<TopicPartition>>emptyMap());
+ thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
+
+ final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
+ mockConsumer.assign(Collections.singleton(t1p1));
+ mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
+
+ final MetricName skippedTotalMetric = metrics.metricName("skipped-records-total", "stream-metrics", Collections.singletonMap("client-id", thread.getName()));
+ assertEquals(0.0, metrics.metric(skippedTotalMetric).metricValue());
+
+ long offset = -1;
+ mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
+ mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
+ thread.runOnce(-1);
+ assertEquals(2.0, metrics.metric(skippedTotalMetric).metricValue());
+
+ mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
+ mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
+ mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
+ mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
+ thread.runOnce(-1);
+ assertEquals(6.0, metrics.metric(skippedTotalMetric).metricValue());
+
+ mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, 1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
+ mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, 1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
+ thread.runOnce(-1);
+ assertEquals(6.0, metrics.metric(skippedTotalMetric).metricValue());
+ }
+
private void assertThreadMetadataHasEmptyTasksWithState(ThreadMetadata metadata, StreamThread.State state) {
assertEquals(state.name(), metadata.threadState());
assertTrue(metadata.activeTasks().isEmpty());
--
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <co...@kafka.apache.org>'].