You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/05/30 21:59:06 UTC
[kafka] branch trunk updated: KAFKA-8389: Remove redundant
bookkeeping from MockProcessor (#6761)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a4c0b18 KAFKA-8389: Remove redundant bookkeeping from MockProcessor (#6761)
a4c0b18 is described below
commit a4c0b1841a9fe69d040128129293db201f81fe4a
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Thu May 30 14:58:53 2019 -0700
KAFKA-8389: Remove redundant bookkeeping from MockProcessor (#6761)
Remove processedKeys / processedValues / processedWithTimestamps as they are covered with processed already.
Reviewers: Matthias J. Sax <mj...@apache.org>, John Roesler <jo...@confluent.io>, Boyang Chen <bo...@confluent.io>
---
.../kstream/internals/KGroupedStreamImplTest.java | 28 +++++++++----------
.../java/org/apache/kafka/test/MockProcessor.java | 32 ++++++----------------
2 files changed, 22 insertions(+), 38 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index 0c850e7..b19fd55 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.TopologyException;
@@ -43,6 +42,7 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
@@ -575,19 +575,19 @@ public class KGroupedStreamImplTest {
driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 500L));
driver.pipeInput(recordFactory.create(TOPIC, "3", "B", 100L));
}
- assertThat(supplier.theCapturedProcessor().processedWithTimestamps, equalTo(Arrays.asList(
- new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 0L),
- new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 2L, 499L),
- new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 3L, 499L),
- new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 0L),
- new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 2L, 100L),
- new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 3L, 200L),
- new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(0L, 500L)), 1L, 1L),
- new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 1L, 500L),
- new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 2L, 500L),
- new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 1L, 500L),
- new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 2L, 500L),
- new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(0L, 500L)), 2L, 100L)
+ assertThat(supplier.theCapturedProcessor().processed, equalTo(Arrays.asList(
+ MockProcessor.makeRecord(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 0L),
+ MockProcessor.makeRecord(new Windowed<>("1", new TimeWindow(0L, 500L)), 2L, 499L),
+ MockProcessor.makeRecord(new Windowed<>("1", new TimeWindow(0L, 500L)), 3L, 499L),
+ MockProcessor.makeRecord(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 0L),
+ MockProcessor.makeRecord(new Windowed<>("2", new TimeWindow(0L, 500L)), 2L, 100L),
+ MockProcessor.makeRecord(new Windowed<>("2", new TimeWindow(0L, 500L)), 3L, 200L),
+ MockProcessor.makeRecord(new Windowed<>("3", new TimeWindow(0L, 500L)), 1L, 1L),
+ MockProcessor.makeRecord(new Windowed<>("1", new TimeWindow(500L, 1000L)), 1L, 500L),
+ MockProcessor.makeRecord(new Windowed<>("1", new TimeWindow(500L, 1000L)), 2L, 500L),
+ MockProcessor.makeRecord(new Windowed<>("2", new TimeWindow(500L, 1000L)), 1L, 500L),
+ MockProcessor.makeRecord(new Windowed<>("2", new TimeWindow(500L, 1000L)), 2L, 500L),
+ MockProcessor.makeRecord(new Windowed<>("3", new TimeWindow(0L, 500L)), 2L, 100L)
)));
}
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
index 0c7a015..f1e0597 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.test;
-import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -30,13 +29,9 @@ import java.util.Map;
import static org.junit.Assert.assertEquals;
-@SuppressWarnings("WeakerAccess")
public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
public final ArrayList<String> processed = new ArrayList<>();
- public final ArrayList<K> processedKeys = new ArrayList<>();
- public final ArrayList<V> processedValues = new ArrayList<>();
- public final ArrayList<KeyValueTimestamp> processedWithTimestamps = new ArrayList<>();
public final Map<K, ValueAndTimestamp<V>> lastValueAndTimestampPerKey = new HashMap<>();
public final ArrayList<Long> punctuatedStreamTime = new ArrayList<>();
@@ -81,19 +76,13 @@ public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
@Override
public void process(final K key, final V value) {
- processedKeys.add(key);
- processedValues.add(value);
- processedWithTimestamps.add(new KeyValueTimestamp<>(key, value, context().timestamp()));
if (value != null) {
lastValueAndTimestampPerKey.put(key, ValueAndTimestamp.make(value, context().timestamp()));
} else {
lastValueAndTimestampPerKey.remove(key);
}
- processed.add(
- (key == null ? "null" : key) +
- ":" + (value == null ? "null" : value) +
- " (ts: " + context().timestamp() + ")"
- );
+
+ processed.add(makeRecord(key, value, context().timestamp()));
if (commitRequested) {
context().commit();
@@ -101,24 +90,19 @@ public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
}
}
- public void checkAndClearProcessResult(final String... expected) {
- assertEquals("the number of outputs:" + processed, expected.length, processed.size());
- for (int i = 0; i < expected.length; i++) {
- assertEquals("output[" + i + "]:", expected[i], processed.get(i));
- }
-
- processed.clear();
- processedWithTimestamps.clear();
+ public static String makeRecord(final Object key, final Object value, final long timestamp) {
+ return (key == null ? "null" : key) +
+ ":" + (value == null ? "null" : value) +
+ " (ts: " + timestamp + ")";
}
- public void checkAndClearProcessResult(final KeyValueTimestamp... expected) {
+ public void checkAndClearProcessResult(final String... expected) {
assertEquals("the number of outputs:" + processed, expected.length, processed.size());
for (int i = 0; i < expected.length; i++) {
- assertEquals("output[" + i + "]:", expected[i], processedWithTimestamps.get(i));
+ assertEquals("output[" + i + "]:", expected[i], processed.get(i));
}
processed.clear();
- processedWithTimestamps.clear();
}
public void requestCommit() {