You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/05/30 21:59:06 UTC

[kafka] branch trunk updated: KAFKA-8389: Remove redundant bookkeeping from MockProcessor (#6761)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a4c0b18  KAFKA-8389: Remove redundant bookkeeping from MockProcessor (#6761)
a4c0b18 is described below

commit a4c0b1841a9fe69d040128129293db201f81fe4a
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Thu May 30 14:58:53 2019 -0700

    KAFKA-8389: Remove redundant bookkeeping from MockProcessor (#6761)
    
    Remove processedKeys / processedValues / processedWithTimestamps as they are covered with processed already.
    
    Reviewers: Matthias J. Sax <mj...@apache.org>, John Roesler <jo...@confluent.io>, Boyang Chen <bo...@confluent.io>
---
 .../kstream/internals/KGroupedStreamImplTest.java  | 28 +++++++++----------
 .../java/org/apache/kafka/test/MockProcessor.java  | 32 ++++++----------------
 2 files changed, 22 insertions(+), 38 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index 0c850e7..b19fd55 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.errors.TopologyException;
@@ -43,6 +42,7 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.StreamsTestUtils;
@@ -575,19 +575,19 @@ public class KGroupedStreamImplTest {
             driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 500L));
             driver.pipeInput(recordFactory.create(TOPIC, "3", "B", 100L));
         }
-        assertThat(supplier.theCapturedProcessor().processedWithTimestamps, equalTo(Arrays.asList(
-            new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 0L),
-            new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 2L, 499L),
-            new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 3L, 499L),
-            new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 0L),
-            new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 2L, 100L),
-            new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 3L, 200L),
-            new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(0L, 500L)), 1L, 1L),
-            new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 1L, 500L),
-            new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 2L, 500L),
-            new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 1L, 500L),
-            new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 2L, 500L),
-            new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(0L, 500L)), 2L, 100L)
+        assertThat(supplier.theCapturedProcessor().processed, equalTo(Arrays.asList(
+            MockProcessor.makeRecord(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 0L),
+            MockProcessor.makeRecord(new Windowed<>("1", new TimeWindow(0L, 500L)), 2L, 499L),
+            MockProcessor.makeRecord(new Windowed<>("1", new TimeWindow(0L, 500L)), 3L, 499L),
+            MockProcessor.makeRecord(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 0L),
+            MockProcessor.makeRecord(new Windowed<>("2", new TimeWindow(0L, 500L)), 2L, 100L),
+            MockProcessor.makeRecord(new Windowed<>("2", new TimeWindow(0L, 500L)), 3L, 200L),
+            MockProcessor.makeRecord(new Windowed<>("3", new TimeWindow(0L, 500L)), 1L, 1L),
+            MockProcessor.makeRecord(new Windowed<>("1", new TimeWindow(500L, 1000L)), 1L, 500L),
+            MockProcessor.makeRecord(new Windowed<>("1", new TimeWindow(500L, 1000L)), 2L, 500L),
+            MockProcessor.makeRecord(new Windowed<>("2", new TimeWindow(500L, 1000L)), 1L, 500L),
+            MockProcessor.makeRecord(new Windowed<>("2", new TimeWindow(500L, 1000L)), 2L, 500L),
+            MockProcessor.makeRecord(new Windowed<>("3", new TimeWindow(0L, 500L)), 2L, 100L)
         )));
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
index 0c7a015..f1e0597 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.test;
 
-import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -30,13 +29,9 @@ import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 
-@SuppressWarnings("WeakerAccess")
 public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
 
     public final ArrayList<String> processed = new ArrayList<>();
-    public final ArrayList<K> processedKeys = new ArrayList<>();
-    public final ArrayList<V> processedValues = new ArrayList<>();
-    public final ArrayList<KeyValueTimestamp> processedWithTimestamps = new ArrayList<>();
     public final Map<K, ValueAndTimestamp<V>> lastValueAndTimestampPerKey = new HashMap<>();
 
     public final ArrayList<Long> punctuatedStreamTime = new ArrayList<>();
@@ -81,19 +76,13 @@ public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
 
     @Override
     public void process(final K key, final V value) {
-        processedKeys.add(key);
-        processedValues.add(value);
-        processedWithTimestamps.add(new KeyValueTimestamp<>(key, value, context().timestamp()));
         if (value != null) {
             lastValueAndTimestampPerKey.put(key, ValueAndTimestamp.make(value, context().timestamp()));
         } else {
             lastValueAndTimestampPerKey.remove(key);
         }
-        processed.add(
-            (key == null ? "null" : key) +
-            ":" + (value == null ? "null" : value) +
-            " (ts: " + context().timestamp() + ")"
-        );
+
+        processed.add(makeRecord(key, value, context().timestamp()));
 
         if (commitRequested) {
             context().commit();
@@ -101,24 +90,19 @@ public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
         }
     }
 
-    public void checkAndClearProcessResult(final String... expected) {
-        assertEquals("the number of outputs:" + processed, expected.length, processed.size());
-        for (int i = 0; i < expected.length; i++) {
-            assertEquals("output[" + i + "]:", expected[i], processed.get(i));
-        }
-
-        processed.clear();
-        processedWithTimestamps.clear();
+    public static String makeRecord(final Object key, final Object value, final long timestamp) {
+        return (key == null ? "null" : key) +
+            ":" + (value == null ? "null" : value) +
+            " (ts: " + timestamp + ")";
     }
 
-    public void checkAndClearProcessResult(final KeyValueTimestamp... expected) {
+    public void checkAndClearProcessResult(final String... expected) {
         assertEquals("the number of outputs:" + processed, expected.length, processed.size());
         for (int i = 0; i < expected.length; i++) {
-            assertEquals("output[" + i + "]:", expected[i], processedWithTimestamps.get(i));
+            assertEquals("output[" + i + "]:", expected[i], processed.get(i));
         }
 
         processed.clear();
-        processedWithTimestamps.clear();
     }
 
     public void requestCommit() {