You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/02/07 22:08:56 UTC

[kafka] branch trunk updated: KAFKA-4641: Add more unit test for stream thread (#4531)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5d69a79  KAFKA-4641: Add more unit test for stream thread (#4531)
5d69a79 is described below

commit 5d69a79948bce0fab4705c7d7d0f3b02f2548c0d
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Wed Feb 7 14:08:53 2018 -0800

    KAFKA-4641: Add more unit test for stream thread (#4531)
    
    Before the patch, jacoco coverage test:
    
    Element	Missed Instructions	Cov.	Missed Branches	Cov.	Missed	Cxty	Missed	Lines	Missed	Methods	Missed	Classes
    Total	3,386 of 22,177	84%	336 of 1,639	79%	350	1,589	526	4,451	103	768	1	102
    StreamThread	 	77%	 	76%	27	102	48	299	1	31	0	1
    After the patch:
    
    Element	Missed Instructions	Cov.	Missed Branches	Cov.	Missed	Cxty	Missed	Lines	Missed	Methods	Missed	Classes
    Total	3,329 of 22,180	84%	329 of 1,639	79%	345	1,590	516	4,452	102	769	1	102
    StreamThread	 	81%	 	80%	23	103	39	300	1	32	0	1
    
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Damian Guy <da...@confluent.io>
---
 .../streams/processor/internals/StreamThread.java  |   6 +-
 .../processor/internals/StreamThreadTest.java      | 161 ++++++++++++++++++++-
 2 files changed, 161 insertions(+), 6 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 064a293..5e25d02 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1183,8 +1183,12 @@ public class StreamThread extends Thread {
         return sb.toString();
     }
 
-    // this is for testing only
+    // the following are for testing only
     TaskManager taskManager() {
         return taskManager;
     }
+
+    Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords() {
+        return standbyRecords;
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index e67fe14..cc05604 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.Cluster;
@@ -40,7 +41,13 @@ import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
 import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
 import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
+import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
 import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TaskMetadata;
 import org.apache.kafka.streams.processor.ThreadMetadata;
@@ -100,13 +107,16 @@ public class StreamThreadTest {
     }
 
     private final String topic1 = "topic1";
+    private final String topic2 = "topic2";
 
     private final TopicPartition t1p1 = new TopicPartition(topic1, 1);
     private final TopicPartition t1p2 = new TopicPartition(topic1, 2);
+    private final TopicPartition t2p1 = new TopicPartition(topic2, 1);
 
     // task0 is unused
     private final TaskId task1 = new TaskId(0, 1);
     private final TaskId task2 = new TaskId(0, 2);
+    private final TaskId task3 = new TaskId(1, 1);
 
     private Properties configProps(final boolean enableEos) {
         return new Properties() {
@@ -129,7 +139,7 @@ public class StreamThreadTest {
     public void testPartitionAssignmentChangeForSingleGroup() {
         internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
 
-        final StreamThread thread = getStreamThread();
+        final StreamThread thread = createStreamThread(clientId, config, false);
 
         final StateListenerStub stateListener = new StateListenerStub();
         thread.setStateListener(stateListener);
@@ -685,10 +695,6 @@ public class StreamThreadTest {
         }
     }
 
-    private StreamThread getStreamThread() {
-        return createStreamThread(clientId, config, false);
-    }
-
     @Test
     public void shouldReturnActiveTaskMetadataWhileRunningState() throws InterruptedException {
         internalTopologyBuilder.addSource(null, "source", null, null, null, topic1);
@@ -759,6 +765,151 @@ public class StreamThreadTest {
         assertTrue(threadMetadata.activeTasks().isEmpty());
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldUpdateStandbyTask() {
+        final String storeName1 = "count-one";
+        final String storeName2 = "table-two";
+        final String changelogName = applicationId + "-" + storeName1 + "-changelog";
+        final TopicPartition partition1 = new TopicPartition(changelogName, 1);
+        final TopicPartition partition2 = t2p1;
+        internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
+                .groupByKey().count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as(storeName1));
+        internalStreamsBuilder.table(topic2, new ConsumedInternal(), new MaterializedInternal(Materialized.as(storeName2), internalStreamsBuilder, ""));
+
+        final StreamThread thread = createStreamThread(clientId, config, false);
+        final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
+        restoreConsumer.updatePartitions(changelogName,
+                Collections.singletonList(new PartitionInfo(changelogName,
+                        1,
+                        null,
+                        new Node[0],
+                        new Node[0])));
+
+        restoreConsumer.assign(Utils.mkSet(partition1, partition2));
+        restoreConsumer.updateEndOffsets(Collections.singletonMap(partition1, 10L));
+        restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition1, 0L));
+        restoreConsumer.updateEndOffsets(Collections.singletonMap(partition2, 10L));
+        restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition2, 0L));
+        // let the store1 be restored from 0 to 10; store2 be restored from 0 to (committed offset) 5
+        clientSupplier.consumer.assign(Utils.mkSet(partition2));
+        clientSupplier.consumer.commitSync(Collections.singletonMap(partition2, new OffsetAndMetadata(5L, "")));
+
+        for (long i = 0L; i < 10L; i++) {
+            restoreConsumer.addRecord(new ConsumerRecord<>(changelogName, 1, i, ("K" + i).getBytes(), ("V" + i).getBytes()));
+            restoreConsumer.addRecord(new ConsumerRecord<>(topic2, 1, i, ("K" + i).getBytes(), ("V" + i).getBytes()));
+        }
+
+        thread.setState(StreamThread.State.RUNNING);
+
+        thread.rebalanceListener.onPartitionsRevoked(null);
+
+        final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
+
+        // assign single partition
+        standbyTasks.put(task1, Collections.singleton(t1p1));
+        standbyTasks.put(task3, Collections.singleton(t2p1));
+
+        thread.taskManager().setAssignmentMetadata(Collections.<TaskId, Set<TopicPartition>>emptyMap(), standbyTasks);
+
+        thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList());
+
+        thread.runOnce(-1);
+
+        final StandbyTask standbyTask1 = thread.taskManager().standbyTask(partition1);
+        final StandbyTask standbyTask2 = thread.taskManager().standbyTask(partition2);
+        final KeyValueStore<Object, Long> store1 = (KeyValueStore<Object, Long>) standbyTask1.getStore(storeName1);
+        final KeyValueStore<Object, Long> store2 = (KeyValueStore<Object, Long>) standbyTask2.getStore(storeName2);
+
+        assertEquals(10L, store1.approximateNumEntries());
+        assertEquals(5L, store2.approximateNumEntries());
+        assertEquals(Collections.singleton(partition2), restoreConsumer.paused());
+        assertEquals(1, thread.standbyRecords().size());
+        assertEquals(5, thread.standbyRecords().get(partition2).size());
+    }
+
+    @Test
+    public void shouldPunctuateActiveTask() {
+        final List<Long> punctuatedStreamTime = new ArrayList<>();
+        final List<Long> punctuatedWallClockTime = new ArrayList<>();
+        final ProcessorSupplier<Object, Object> punctuateProcessor = new ProcessorSupplier<Object, Object>() {
+            @Override
+            public Processor<Object, Object> get() {
+                return new Processor<Object, Object>() {
+                    @Override
+                    public void init(ProcessorContext context) {
+                        context.schedule(100L, PunctuationType.STREAM_TIME, new Punctuator() {
+                            @Override
+                            public void punctuate(long timestamp) {
+                                punctuatedStreamTime.add(timestamp);
+                            }
+                        });
+                        context.schedule(100L, PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
+                            @Override
+                            public void punctuate(long timestamp) {
+                                punctuatedWallClockTime.add(timestamp);
+                            }
+                        });
+                    }
+
+                    @Override
+                    public void process(Object key, Object value) { }
+
+                    @SuppressWarnings("deprecation")
+                    @Override
+                    public void punctuate(long timestamp) { }
+
+                    @Override
+                    public void close() { }
+                };
+            }
+        };
+
+        internalStreamsBuilder.stream(Collections.singleton(topic1), consumed).process(punctuateProcessor);
+
+        final StreamThread thread = createStreamThread(clientId, config, false);
+
+        thread.setState(StreamThread.State.RUNNING);
+
+        thread.rebalanceListener.onPartitionsRevoked(null);
+        final List<TopicPartition> assignedPartitions = new ArrayList<>();
+
+        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+
+        // assign single partition
+        assignedPartitions.add(t1p1);
+        activeTasks.put(task1, Collections.singleton(t1p1));
+
+        thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap());
+
+        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
+        clientSupplier.consumer.assign(assignedPartitions);
+        clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
+
+        thread.runOnce(-1);
+
+        assertEquals(0, punctuatedStreamTime.size());
+        assertEquals(0, punctuatedWallClockTime.size());
+
+        mockTime.sleep(100L);
+        for (long i = 0L; i < 10L; i++) {
+            clientSupplier.consumer.addRecord(new ConsumerRecord<>(topic1, 1, i, i * 100L, TimestampType.CREATE_TIME, ConsumerRecord.NULL_CHECKSUM, ("K" + i).getBytes().length, ("V" + i).getBytes().length, ("K" + i).getBytes(), ("V" + i).getBytes()));
+        }
+
+        thread.runOnce(-1);
+
+        assertEquals(1, punctuatedStreamTime.size());
+        assertEquals(1, punctuatedWallClockTime.size());
+
+        mockTime.sleep(100L);
+
+        thread.runOnce(-1);
+
+        // we should skip stream time punctuation, only trigger wall-clock time punctuation
+        assertEquals(1, punctuatedStreamTime.size());
+        assertEquals(2, punctuatedWallClockTime.size());
+    }
+
     @Test
     public void shouldAlwaysUpdateTasksMetadataAfterChangingState() throws InterruptedException {
         final StreamThread thread = createStreamThread(clientId, config, false);

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.