You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/05/22 22:53:49 UTC

[GitHub] [kafka] mjsax opened a new pull request #10745: MINOR: add window verification to sliding-window co-group test

mjsax opened a new pull request #10745:
URL: https://github.com/apache/kafka/pull/10745


   Follow up to #10703 \cc @ijuma 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10745: MINOR: add window verification to sliding-window co-group test

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10745:
URL: https://github.com/apache/kafka/pull/10745#discussion_r639358048



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
##########
@@ -169,34 +167,37 @@ public void slidingWindowAggregateStreamsTest() {
             testInputTopic.pipeInput("k2", "B", 504);
             testInputTopic.pipeInput("k1", "B", 504);
 
-            final Set<TestRecord<String, String>> results = new HashSet<>();
-            while (!testOutputTopic.isEmpty()) {
-                final TestRecord<Windowed<String>, String> realRecord = testOutputTopic.readRecord();
-                final TestRecord<String, String> nonWindowedRecord = new TestRecord<>(
-                    realRecord.getKey().key(), realRecord.getValue(), null, realRecord.timestamp());
-                results.add(nonWindowedRecord);
-            }
-            final Set<TestRecord<String, String>> expected = new HashSet<>();
-            expected.add(new TestRecord<>("k1", "0+A", null, 500L));
-            expected.add(new TestRecord<>("k2", "0+A", null, 500L));
-            expected.add(new TestRecord<>("k2", "0+A", null, 501L));
-            expected.add(new TestRecord<>("k2", "0+A+A", null, 501L));
-            expected.add(new TestRecord<>("k1", "0+A", null, 502L));
-            expected.add(new TestRecord<>("k1", "0+A+A", null, 502L));
-            expected.add(new TestRecord<>("k1", "0+A+B", null, 503L));
-            expected.add(new TestRecord<>("k1", "0+B", null, 503L));
-            expected.add(new TestRecord<>("k1", "0+A+A+B", null, 503L));
-            expected.add(new TestRecord<>("k2", "0+A+B", null, 503L));
-            expected.add(new TestRecord<>("k2", "0+B", null, 503L));
-            expected.add(new TestRecord<>("k2", "0+A+A+B", null, 503L));
-            expected.add(new TestRecord<>("k2", "0+A+B+B", null, 504L));
-            expected.add(new TestRecord<>("k2", "0+B+B", null, 504L));
-            expected.add(new TestRecord<>("k2", "0+B", null, 504L));
-            expected.add(new TestRecord<>("k2", "0+A+A+B+B", null, 504L));
-            expected.add(new TestRecord<>("k1", "0+A+B+B", null, 504L));
-            expected.add(new TestRecord<>("k1", "0+B+B", null, 504L));
-            expected.add(new TestRecord<>("k1", "0+B", null, 504L));
-            expected.add(new TestRecord<>("k1", "0+A+A+B+B", null, 504L));
+            final List<TestRecord<Windowed<String>, String>> results = testOutputTopic.readRecordsToList();
+
+            final List<TestRecord<Windowed<String>, String>> expected = new LinkedList<>();
+            // k1-A-500
+            expected.add(new TestRecord<>(new Windowed<>("k1", new TimeWindow(0L, 500L)), "0+A", null, 500L));

Review comment:
       >A {@link TimeWindow} covers a half-open time interval
   
   I was about to say we should just make `TimeWindow` un-opinionated, but this is literally the first thing in the javadocs for the class. So I'd say it's pretty clear about what it's representing -- totally missed this before, I thought it was just a basic container class 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #10745: MINOR: add window verification to sliding-window co-group test

Posted by GitBox <gi...@apache.org>.
mjsax merged pull request #10745:
URL: https://github.com/apache/kafka/pull/10745


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10745: MINOR: add window verification to sliding-window co-group test

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10745:
URL: https://github.com/apache/kafka/pull/10745#discussion_r637459718



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
##########
@@ -169,34 +167,37 @@ public void slidingWindowAggregateStreamsTest() {
             testInputTopic.pipeInput("k2", "B", 504);
             testInputTopic.pipeInput("k1", "B", 504);
 
-            final Set<TestRecord<String, String>> results = new HashSet<>();
-            while (!testOutputTopic.isEmpty()) {
-                final TestRecord<Windowed<String>, String> realRecord = testOutputTopic.readRecord();
-                final TestRecord<String, String> nonWindowedRecord = new TestRecord<>(
-                    realRecord.getKey().key(), realRecord.getValue(), null, realRecord.timestamp());
-                results.add(nonWindowedRecord);
-            }
-            final Set<TestRecord<String, String>> expected = new HashSet<>();
-            expected.add(new TestRecord<>("k1", "0+A", null, 500L));
-            expected.add(new TestRecord<>("k2", "0+A", null, 500L));
-            expected.add(new TestRecord<>("k2", "0+A", null, 501L));
-            expected.add(new TestRecord<>("k2", "0+A+A", null, 501L));
-            expected.add(new TestRecord<>("k1", "0+A", null, 502L));
-            expected.add(new TestRecord<>("k1", "0+A+A", null, 502L));
-            expected.add(new TestRecord<>("k1", "0+A+B", null, 503L));
-            expected.add(new TestRecord<>("k1", "0+B", null, 503L));
-            expected.add(new TestRecord<>("k1", "0+A+A+B", null, 503L));
-            expected.add(new TestRecord<>("k2", "0+A+B", null, 503L));
-            expected.add(new TestRecord<>("k2", "0+B", null, 503L));
-            expected.add(new TestRecord<>("k2", "0+A+A+B", null, 503L));
-            expected.add(new TestRecord<>("k2", "0+A+B+B", null, 504L));
-            expected.add(new TestRecord<>("k2", "0+B+B", null, 504L));
-            expected.add(new TestRecord<>("k2", "0+B", null, 504L));
-            expected.add(new TestRecord<>("k2", "0+A+A+B+B", null, 504L));
-            expected.add(new TestRecord<>("k1", "0+A+B+B", null, 504L));
-            expected.add(new TestRecord<>("k1", "0+B+B", null, 504L));
-            expected.add(new TestRecord<>("k1", "0+B", null, 504L));
-            expected.add(new TestRecord<>("k1", "0+A+A+B+B", null, 504L));
+            final List<TestRecord<Windowed<String>, String>> results = testOutputTopic.readRecordsToList();
+
+            final List<TestRecord<Windowed<String>, String>> expected = new LinkedList<>();
+            // k1-A-500
+            expected.add(new TestRecord<>(new Windowed<>("k1", new TimeWindow(0L, 500L)), "0+A", null, 500L));

Review comment:
       Just realized that using `TimeWindows` is actually incorrect... (filed https://issues.apache.org/jira/browse/KAFKA-12839)
   
   \cc @lct45 @ableegoldman 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org