You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/05/26 02:29:57 UTC

[GitHub] [kafka] ableegoldman commented on a change in pull request #10745: MINOR: add window verification to sliding-window co-group test

ableegoldman commented on a change in pull request #10745:
URL: https://github.com/apache/kafka/pull/10745#discussion_r639358048



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
##########
@@ -169,34 +167,37 @@ public void slidingWindowAggregateStreamsTest() {
             testInputTopic.pipeInput("k2", "B", 504);
             testInputTopic.pipeInput("k1", "B", 504);
 
-            final Set<TestRecord<String, String>> results = new HashSet<>();
-            while (!testOutputTopic.isEmpty()) {
-                final TestRecord<Windowed<String>, String> realRecord = testOutputTopic.readRecord();
-                final TestRecord<String, String> nonWindowedRecord = new TestRecord<>(
-                    realRecord.getKey().key(), realRecord.getValue(), null, realRecord.timestamp());
-                results.add(nonWindowedRecord);
-            }
-            final Set<TestRecord<String, String>> expected = new HashSet<>();
-            expected.add(new TestRecord<>("k1", "0+A", null, 500L));
-            expected.add(new TestRecord<>("k2", "0+A", null, 500L));
-            expected.add(new TestRecord<>("k2", "0+A", null, 501L));
-            expected.add(new TestRecord<>("k2", "0+A+A", null, 501L));
-            expected.add(new TestRecord<>("k1", "0+A", null, 502L));
-            expected.add(new TestRecord<>("k1", "0+A+A", null, 502L));
-            expected.add(new TestRecord<>("k1", "0+A+B", null, 503L));
-            expected.add(new TestRecord<>("k1", "0+B", null, 503L));
-            expected.add(new TestRecord<>("k1", "0+A+A+B", null, 503L));
-            expected.add(new TestRecord<>("k2", "0+A+B", null, 503L));
-            expected.add(new TestRecord<>("k2", "0+B", null, 503L));
-            expected.add(new TestRecord<>("k2", "0+A+A+B", null, 503L));
-            expected.add(new TestRecord<>("k2", "0+A+B+B", null, 504L));
-            expected.add(new TestRecord<>("k2", "0+B+B", null, 504L));
-            expected.add(new TestRecord<>("k2", "0+B", null, 504L));
-            expected.add(new TestRecord<>("k2", "0+A+A+B+B", null, 504L));
-            expected.add(new TestRecord<>("k1", "0+A+B+B", null, 504L));
-            expected.add(new TestRecord<>("k1", "0+B+B", null, 504L));
-            expected.add(new TestRecord<>("k1", "0+B", null, 504L));
-            expected.add(new TestRecord<>("k1", "0+A+A+B+B", null, 504L));
+            final List<TestRecord<Windowed<String>, String>> results = testOutputTopic.readRecordsToList();
+
+            final List<TestRecord<Windowed<String>, String>> expected = new LinkedList<>();
+            // k1-A-500
+            expected.add(new TestRecord<>(new Windowed<>("k1", new TimeWindow(0L, 500L)), "0+A", null, 500L));

Review comment:
       >A {@link TimeWindow} covers a half-open time interval
   
   I was about to say we should just make `TimeWindow` un-opinionated, but this is literally the first thing in the javadocs for the class. So I'd say it's pretty clear about what it's representing -- totally missed this before, I thought it was just a basic container class 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org