You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/02/07 22:08:56 UTC
[kafka] branch trunk updated: KAFKA-4641: Add more unit test for
stream thread (#4531)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5d69a79 KAFKA-4641: Add more unit test for stream thread (#4531)
5d69a79 is described below
commit 5d69a79948bce0fab4705c7d7d0f3b02f2548c0d
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Wed Feb 7 14:08:53 2018 -0800
KAFKA-4641: Add more unit test for stream thread (#4531)
Before the patch, jacoco coverage test:
Element Missed Instructions Cov. Missed Branches Cov. Missed Cxty Missed Lines Missed Methods Missed Classes
Total 3,386 of 22,177 84% 336 of 1,639 79% 350 1,589 526 4,451 103 768 1 102
StreamThread 77% 76% 27 102 48 299 1 31 0 1
After the patch:
Element Missed Instructions Cov. Missed Branches Cov. Missed Cxty Missed Lines Missed Methods Missed Classes
Total 3,329 of 22,180 84% 329 of 1,639 79% 345 1,590 516 4,452 102 769 1 102
StreamThread 81% 80% 23 103 39 300 1 32 0 1
Reviewers: Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Damian Guy <da...@confluent.io>
---
.../streams/processor/internals/StreamThread.java | 6 +-
.../processor/internals/StreamThreadTest.java | 161 ++++++++++++++++++++-
2 files changed, 161 insertions(+), 6 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 064a293..5e25d02 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1183,8 +1183,12 @@ public class StreamThread extends Thread {
return sb.toString();
}
- // this is for testing only
+ // the following are for testing only
TaskManager taskManager() {
return taskManager;
}
+
+ Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords() {
+ return standbyRecords;
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index e67fe14..cc05604 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Cluster;
@@ -40,7 +41,13 @@ import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
+import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
@@ -100,13 +107,16 @@ public class StreamThreadTest {
}
private final String topic1 = "topic1";
+ private final String topic2 = "topic2";
private final TopicPartition t1p1 = new TopicPartition(topic1, 1);
private final TopicPartition t1p2 = new TopicPartition(topic1, 2);
+ private final TopicPartition t2p1 = new TopicPartition(topic2, 1);
// task0 is unused
private final TaskId task1 = new TaskId(0, 1);
private final TaskId task2 = new TaskId(0, 2);
+ private final TaskId task3 = new TaskId(1, 1);
private Properties configProps(final boolean enableEos) {
return new Properties() {
@@ -129,7 +139,7 @@ public class StreamThreadTest {
public void testPartitionAssignmentChangeForSingleGroup() {
internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
- final StreamThread thread = getStreamThread();
+ final StreamThread thread = createStreamThread(clientId, config, false);
final StateListenerStub stateListener = new StateListenerStub();
thread.setStateListener(stateListener);
@@ -685,10 +695,6 @@ public class StreamThreadTest {
}
}
- private StreamThread getStreamThread() {
- return createStreamThread(clientId, config, false);
- }
-
@Test
public void shouldReturnActiveTaskMetadataWhileRunningState() throws InterruptedException {
internalTopologyBuilder.addSource(null, "source", null, null, null, topic1);
@@ -759,6 +765,151 @@ public class StreamThreadTest {
assertTrue(threadMetadata.activeTasks().isEmpty());
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldUpdateStandbyTask() {
+ final String storeName1 = "count-one";
+ final String storeName2 = "table-two";
+ final String changelogName = applicationId + "-" + storeName1 + "-changelog";
+ final TopicPartition partition1 = new TopicPartition(changelogName, 1);
+ final TopicPartition partition2 = t2p1;
+ internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
+ .groupByKey().count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as(storeName1));
+ internalStreamsBuilder.table(topic2, new ConsumedInternal(), new MaterializedInternal(Materialized.as(storeName2), internalStreamsBuilder, ""));
+
+ final StreamThread thread = createStreamThread(clientId, config, false);
+ final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
+ restoreConsumer.updatePartitions(changelogName,
+ Collections.singletonList(new PartitionInfo(changelogName,
+ 1,
+ null,
+ new Node[0],
+ new Node[0])));
+
+ restoreConsumer.assign(Utils.mkSet(partition1, partition2));
+ restoreConsumer.updateEndOffsets(Collections.singletonMap(partition1, 10L));
+ restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition1, 0L));
+ restoreConsumer.updateEndOffsets(Collections.singletonMap(partition2, 10L));
+ restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition2, 0L));
+ // let the store1 be restored from 0 to 10; store2 be restored from 0 to (committed offset) 5
+ clientSupplier.consumer.assign(Utils.mkSet(partition2));
+ clientSupplier.consumer.commitSync(Collections.singletonMap(partition2, new OffsetAndMetadata(5L, "")));
+
+ for (long i = 0L; i < 10L; i++) {
+ restoreConsumer.addRecord(new ConsumerRecord<>(changelogName, 1, i, ("K" + i).getBytes(), ("V" + i).getBytes()));
+ restoreConsumer.addRecord(new ConsumerRecord<>(topic2, 1, i, ("K" + i).getBytes(), ("V" + i).getBytes()));
+ }
+
+ thread.setState(StreamThread.State.RUNNING);
+
+ thread.rebalanceListener.onPartitionsRevoked(null);
+
+ final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
+
+ // assign single partition
+ standbyTasks.put(task1, Collections.singleton(t1p1));
+ standbyTasks.put(task3, Collections.singleton(t2p1));
+
+ thread.taskManager().setAssignmentMetadata(Collections.<TaskId, Set<TopicPartition>>emptyMap(), standbyTasks);
+
+ thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList());
+
+ thread.runOnce(-1);
+
+ final StandbyTask standbyTask1 = thread.taskManager().standbyTask(partition1);
+ final StandbyTask standbyTask2 = thread.taskManager().standbyTask(partition2);
+ final KeyValueStore<Object, Long> store1 = (KeyValueStore<Object, Long>) standbyTask1.getStore(storeName1);
+ final KeyValueStore<Object, Long> store2 = (KeyValueStore<Object, Long>) standbyTask2.getStore(storeName2);
+
+ assertEquals(10L, store1.approximateNumEntries());
+ assertEquals(5L, store2.approximateNumEntries());
+ assertEquals(Collections.singleton(partition2), restoreConsumer.paused());
+ assertEquals(1, thread.standbyRecords().size());
+ assertEquals(5, thread.standbyRecords().get(partition2).size());
+ }
+
+ @Test
+ public void shouldPunctuateActiveTask() {
+ final List<Long> punctuatedStreamTime = new ArrayList<>();
+ final List<Long> punctuatedWallClockTime = new ArrayList<>();
+ final ProcessorSupplier<Object, Object> punctuateProcessor = new ProcessorSupplier<Object, Object>() {
+ @Override
+ public Processor<Object, Object> get() {
+ return new Processor<Object, Object>() {
+ @Override
+ public void init(ProcessorContext context) {
+ context.schedule(100L, PunctuationType.STREAM_TIME, new Punctuator() {
+ @Override
+ public void punctuate(long timestamp) {
+ punctuatedStreamTime.add(timestamp);
+ }
+ });
+ context.schedule(100L, PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
+ @Override
+ public void punctuate(long timestamp) {
+ punctuatedWallClockTime.add(timestamp);
+ }
+ });
+ }
+
+ @Override
+ public void process(Object key, Object value) { }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public void punctuate(long timestamp) { }
+
+ @Override
+ public void close() { }
+ };
+ }
+ };
+
+ internalStreamsBuilder.stream(Collections.singleton(topic1), consumed).process(punctuateProcessor);
+
+ final StreamThread thread = createStreamThread(clientId, config, false);
+
+ thread.setState(StreamThread.State.RUNNING);
+
+ thread.rebalanceListener.onPartitionsRevoked(null);
+ final List<TopicPartition> assignedPartitions = new ArrayList<>();
+
+ final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+
+ // assign single partition
+ assignedPartitions.add(t1p1);
+ activeTasks.put(task1, Collections.singleton(t1p1));
+
+ thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap());
+
+ thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
+ clientSupplier.consumer.assign(assignedPartitions);
+ clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
+
+ thread.runOnce(-1);
+
+ assertEquals(0, punctuatedStreamTime.size());
+ assertEquals(0, punctuatedWallClockTime.size());
+
+ mockTime.sleep(100L);
+ for (long i = 0L; i < 10L; i++) {
+ clientSupplier.consumer.addRecord(new ConsumerRecord<>(topic1, 1, i, i * 100L, TimestampType.CREATE_TIME, ConsumerRecord.NULL_CHECKSUM, ("K" + i).getBytes().length, ("V" + i).getBytes().length, ("K" + i).getBytes(), ("V" + i).getBytes()));
+ }
+
+ thread.runOnce(-1);
+
+ assertEquals(1, punctuatedStreamTime.size());
+ assertEquals(1, punctuatedWallClockTime.size());
+
+ mockTime.sleep(100L);
+
+ thread.runOnce(-1);
+
+ // we should skip stream time punctuation, only trigger wall-clock time punctuation
+ assertEquals(1, punctuatedStreamTime.size());
+ assertEquals(2, punctuatedWallClockTime.size());
+ }
+
@Test
public void shouldAlwaysUpdateTasksMetadataAfterChangingState() throws InterruptedException {
final StreamThread thread = createStreamThread(clientId, config, false);
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.