You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2021/05/22 21:24:38 UTC

[kafka] branch trunk updated: MINOR: Fix deprecation warnings in SlidingWindowedCogroupedKStreamImplTest (#10703)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 47796d2  MINOR: Fix deprecation warnings in SlidingWindowedCogroupedKStreamImplTest (#10703)
47796d2 is described below

commit 47796d2f8781447c8e7de716841db438cde9cc3c
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Sat May 22 14:22:42 2021 -0700

    MINOR: Fix deprecation warnings in SlidingWindowedCogroupedKStreamImplTest (#10703)
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>
---
 .../internals/SlidingWindowedCogroupedKStreamImplTest.java   | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
index 6be96b9..5a06341 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
@@ -59,6 +59,7 @@ public class SlidingWindowedCogroupedKStreamImplTest {
     private static final String TOPIC = "topic";
     private static final String TOPIC2 = "topic2";
     private static final String OUTPUT = "output";
+    private static final long WINDOW_SIZE_MS = 500L;
     private final StreamsBuilder builder = new StreamsBuilder();
 
     private KGroupedStream<String, String> groupedStream;
@@ -80,7 +81,8 @@ public class SlidingWindowedCogroupedKStreamImplTest {
         groupedStream2 = stream2.groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
         cogroupedStream = groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
                 .cogroup(groupedStream2, MockAggregator.TOSTRING_REMOVER);
-        windowedCogroupedStream = cogroupedStream.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L), ofMillis(2000L)));
+        windowedCogroupedStream = cogroupedStream.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(
+            WINDOW_SIZE_MS), ofMillis(2000L)));
     }
 
     @Test
@@ -130,7 +132,7 @@ public class SlidingWindowedCogroupedKStreamImplTest {
                 .with(Serdes.String(), Serdes.String()));
         groupedStream = stream.groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
         groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
-                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L), ofMillis(2000L)))
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(WINDOW_SIZE_MS), ofMillis(2000L)))
                 .aggregate(MockInitializer.STRING_INIT, Named.as("foo"));
 
         assertThat(builder.build().describe().toString(), equalTo(
@@ -156,7 +158,7 @@ public class SlidingWindowedCogroupedKStreamImplTest {
             final TestInputTopic<String, String> testInputTopic = driver.createInputTopic(
                     TOPIC, new StringSerializer(), new StringSerializer());
             final TestOutputTopic<Windowed<String>, String> testOutputTopic = driver.createOutputTopic(
-                    OUTPUT, new TimeWindowedDeserializer<>(new StringDeserializer()), new StringDeserializer());
+                    OUTPUT, new TimeWindowedDeserializer<>(new StringDeserializer(), WINDOW_SIZE_MS), new StringDeserializer());
 
             testInputTopic.pipeInput("k1", "A", 500);
             testInputTopic.pipeInput("k2", "A", 500);
@@ -204,7 +206,7 @@ public class SlidingWindowedCogroupedKStreamImplTest {
     public void slidingWindowAggregateOverlappingWindowsTest() {
 
         final KTable<Windowed<String>, String> customers = groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
-                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L), ofMillis(2000L))).aggregate(
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(WINDOW_SIZE_MS), ofMillis(2000L))).aggregate(
                         MockInitializer.STRING_INIT, Materialized.with(Serdes.String(), Serdes.String()));
         customers.toStream().to(OUTPUT);
 
@@ -212,7 +214,7 @@ public class SlidingWindowedCogroupedKStreamImplTest {
             final TestInputTopic<String, String> testInputTopic = driver.createInputTopic(
                     TOPIC, new StringSerializer(), new StringSerializer());
             final TestOutputTopic<Windowed<String>, String> testOutputTopic = driver.createOutputTopic(
-                    OUTPUT, new TimeWindowedDeserializer<>(new StringDeserializer()), new StringDeserializer());
+                    OUTPUT, new TimeWindowedDeserializer<>(new StringDeserializer(), WINDOW_SIZE_MS), new StringDeserializer());
 
             testInputTopic.pipeInput("k1", "A", 500);
             testInputTopic.pipeInput("k2", "A", 500);