You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2021/05/22 21:24:38 UTC
[kafka] branch trunk updated: MINOR: Fix deprecation warnings in
SlidingWindowedCogroupedKStreamImplTest (#10703)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 47796d2 MINOR: Fix deprecation warnings in SlidingWindowedCogroupedKStreamImplTest (#10703)
47796d2 is described below
commit 47796d2f8781447c8e7de716841db438cde9cc3c
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Sat May 22 14:22:42 2021 -0700
MINOR: Fix deprecation warnings in SlidingWindowedCogroupedKStreamImplTest (#10703)
Reviewers: Matthias J. Sax <ma...@confluent.io>
---
.../internals/SlidingWindowedCogroupedKStreamImplTest.java | 12 +++++++-----
1 file changed, 7 insertions(+), 5 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
index 6be96b9..5a06341 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
@@ -59,6 +59,7 @@ public class SlidingWindowedCogroupedKStreamImplTest {
private static final String TOPIC = "topic";
private static final String TOPIC2 = "topic2";
private static final String OUTPUT = "output";
+ private static final long WINDOW_SIZE_MS = 500L;
private final StreamsBuilder builder = new StreamsBuilder();
private KGroupedStream<String, String> groupedStream;
@@ -80,7 +81,8 @@ public class SlidingWindowedCogroupedKStreamImplTest {
groupedStream2 = stream2.groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
cogroupedStream = groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
.cogroup(groupedStream2, MockAggregator.TOSTRING_REMOVER);
- windowedCogroupedStream = cogroupedStream.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L), ofMillis(2000L)));
+ windowedCogroupedStream = cogroupedStream.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(
+ WINDOW_SIZE_MS), ofMillis(2000L)));
}
@Test
@@ -130,7 +132,7 @@ public class SlidingWindowedCogroupedKStreamImplTest {
.with(Serdes.String(), Serdes.String()));
groupedStream = stream.groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
- .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L), ofMillis(2000L)))
+ .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(WINDOW_SIZE_MS), ofMillis(2000L)))
.aggregate(MockInitializer.STRING_INIT, Named.as("foo"));
assertThat(builder.build().describe().toString(), equalTo(
@@ -156,7 +158,7 @@ public class SlidingWindowedCogroupedKStreamImplTest {
final TestInputTopic<String, String> testInputTopic = driver.createInputTopic(
TOPIC, new StringSerializer(), new StringSerializer());
final TestOutputTopic<Windowed<String>, String> testOutputTopic = driver.createOutputTopic(
- OUTPUT, new TimeWindowedDeserializer<>(new StringDeserializer()), new StringDeserializer());
+ OUTPUT, new TimeWindowedDeserializer<>(new StringDeserializer(), WINDOW_SIZE_MS), new StringDeserializer());
testInputTopic.pipeInput("k1", "A", 500);
testInputTopic.pipeInput("k2", "A", 500);
@@ -204,7 +206,7 @@ public class SlidingWindowedCogroupedKStreamImplTest {
public void slidingWindowAggregateOverlappingWindowsTest() {
final KTable<Windowed<String>, String> customers = groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
- .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L), ofMillis(2000L))).aggregate(
+ .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(WINDOW_SIZE_MS), ofMillis(2000L))).aggregate(
MockInitializer.STRING_INIT, Materialized.with(Serdes.String(), Serdes.String()));
customers.toStream().to(OUTPUT);
@@ -212,7 +214,7 @@ public class SlidingWindowedCogroupedKStreamImplTest {
final TestInputTopic<String, String> testInputTopic = driver.createInputTopic(
TOPIC, new StringSerializer(), new StringSerializer());
final TestOutputTopic<Windowed<String>, String> testOutputTopic = driver.createOutputTopic(
- OUTPUT, new TimeWindowedDeserializer<>(new StringDeserializer()), new StringDeserializer());
+ OUTPUT, new TimeWindowedDeserializer<>(new StringDeserializer(), WINDOW_SIZE_MS), new StringDeserializer());
testInputTopic.pipeInput("k1", "A", 500);
testInputTopic.pipeInput("k2", "A", 500);