You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/04/12 16:35:42 UTC

[kafka] branch trunk updated: KAFKA-6592: ConsoleConsumer should support WindowedSerdes (#4797)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4e35a2b  KAFKA-6592: ConsoleConsumer should support WindowedSerdes (#4797)
4e35a2b is described below

commit 4e35a2bfb7f3b0437a27bb58b8cb39339f750064
Author: huxi <hu...@hotmail.com>
AuthorDate: Fri Apr 13 00:35:37 2018 +0800

    KAFKA-6592: ConsoleConsumer should support WindowedSerdes (#4797)
    
    Have Console consumer support TimeWindowedDeserializer/SessionWindowedDeserializer.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../main/scala/kafka/tools/ConsoleConsumer.scala   |  47 ++++-
 .../unit/kafka/tools/ConsoleConsumerTest.scala     |  19 ++
 .../KStreamAggregationIntegrationTest.java         | 232 ++++++++++++++-------
 3 files changed, 221 insertions(+), 77 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 24fa583..9df4fb4 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -37,6 +37,7 @@ import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, Deserializer}
 import org.apache.kafka.common.utils.Utils
 
+import scala.collection.JavaConversions
 import scala.collection.JavaConverters._
 
 /**
@@ -45,6 +46,11 @@ import scala.collection.JavaConverters._
 object ConsoleConsumer extends Logging {
 
   var messageCount = 0
+  // Keep same names with StreamConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
+  // and StreamConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
+  // visible for testing
+  private[tools] val innerKeySerdeName = "default.windowed.key.serde.inner"
+  private[tools] val innerValueSerdeName = "default.windowed.value.serde.inner"
 
   private val shutdownLatch = new CountDownLatch(1)
 
@@ -291,7 +297,17 @@ object ConsoleConsumer extends Logging {
       .describedAs("class")
       .ofType(classOf[String])
       .defaultsTo(classOf[DefaultMessageFormatter].getName)
-    val messageFormatterArgOpt = parser.accepts("property", "The properties to initialize the message formatter.")
+    val messageFormatterArgOpt = parser.accepts("property",
+      "The properties to initialize the message formatter. Default properties include:\n" +
+        "\tprint.timestamp=true|false\n" +
+        "\tprint.key=true|false\n" +
+        "\tprint.value=true|false\n" +
+        "\tkey.separator=<key.separator>\n" +
+        "\tline.separator=<line.separator>\n" +
+        "\tkey.deserializer=<key.deserializer>\n" +
+        "\tvalue.deserializer=<value.deserializer>\n" +
+        "\tdefault.windowed.key.serde.inner=<windowed.key.serde.inner>\n" +
+        "\tdefault.windowed.value.serde.inner=<windowed.value.serde.inner>")
       .withRequiredArg
       .describedAs("prop")
       .ofType(classOf[String])
@@ -328,6 +344,18 @@ object ConsoleConsumer extends Logging {
       .withRequiredArg
       .describedAs("deserializer for values")
       .ofType(classOf[String])
+    val innerKeyDeserializerOpt = parser.accepts(innerKeySerdeName,
+      "inner serde for key when windowed deserialzier is used; would be ignored otherwise. " +
+        "For example: org.apache.kafka.common.serialization.Serdes\\$StringSerde")
+      .withRequiredArg
+      .describedAs("inner serde for key")
+      .ofType(classOf[String])
+    val innerValueDeserializerOpt = parser.accepts(innerValueSerdeName,
+      "inner serde for value when windowed deserialzier is used; would be ignored otherwise. " +
+        "For example: org.apache.kafka.common.serialization.Serdes\\$StringSerde")
+      .withRequiredArg
+      .describedAs("inner serde for values")
+      .ofType(classOf[String])
     val enableSystestEventsLoggingOpt = parser.accepts("enable-systest-events",
                                                        "Log lifecycle events of the consumer in addition to logging consumed " +
                                                        "messages. (This is specific for system tests.)")
@@ -372,6 +400,8 @@ object ConsoleConsumer extends Logging {
     val bootstrapServer = options.valueOf(bootstrapServerOpt)
     val keyDeserializer = options.valueOf(keyDeserializerOpt)
     val valueDeserializer = options.valueOf(valueDeserializerOpt)
+    val innerKeyDeserializer = options.valueOf(innerKeyDeserializerOpt)
+    val innerValueDeserializer = options.valueOf(innerValueDeserializerOpt)
     val isolationLevel = options.valueOf(isolationLevelOpt).toString
     val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
 
@@ -381,6 +411,13 @@ object ConsoleConsumer extends Logging {
     if (valueDeserializer != null && !valueDeserializer.isEmpty) {
       formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer)
     }
+    if (innerKeyDeserializer != null && !innerKeyDeserializer.isEmpty) {
+      formatterArgs.setProperty(innerKeySerdeName, innerKeyDeserializer)
+    }
+    if (innerValueDeserializer != null && !innerValueDeserializer.isEmpty) {
+      formatterArgs.setProperty(innerValueSerdeName, innerValueDeserializer)
+    }
+
     formatter.init(formatterArgs)
 
     if (useOldConsumer) {
@@ -521,11 +558,15 @@ class DefaultMessageFormatter extends MessageFormatter {
     if (props.containsKey("line.separator"))
       lineSeparator = props.getProperty("line.separator").getBytes(StandardCharsets.UTF_8)
     // Note that `toString` will be called on the instance returned by `Deserializer.deserialize`
-    if (props.containsKey("key.deserializer"))
+    if (props.containsKey("key.deserializer")) {
       keyDeserializer = Some(Class.forName(props.getProperty("key.deserializer")).newInstance().asInstanceOf[Deserializer[_]])
+      keyDeserializer.get.configure(JavaConversions.propertiesAsScalaMap(props).asJava, true)
+    }
     // Note that `toString` will be called on the instance returned by `Deserializer.deserialize`
-    if (props.containsKey("value.deserializer"))
+    if (props.containsKey("value.deserializer")) {
       valueDeserializer = Some(Class.forName(props.getProperty("value.deserializer")).newInstance().asInstanceOf[Deserializer[_]])
+      valueDeserializer.get.configure(JavaConversions.propertiesAsScalaMap(props).asJava, false)
+    }
   }
 
   def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index 9ae8b96..f5195c3 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -25,6 +25,7 @@ import kafka.utils.{Exit, TestUtils}
 import org.apache.kafka.clients.consumer.{ConsumerRecord, MockConsumer, OffsetResetStrategy}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.{Before, Test}
@@ -537,4 +538,22 @@ class ConsoleConsumerTest {
 
     Exit.resetExitProcedure()
   }
+
+  @Test
+  def testCustomPropertyShouldBePassedToConfigureMethod(): Unit = {
+    val args = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test",
+      "--property", "print.key=true",
+      "--property", "key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer",
+      "--" + ConsoleConsumer.innerKeySerdeName, "org.apache.kafka.common.serialization.Serdes$StringSerde",
+      "--property", "my-test1=abc"
+    )
+    val config = new ConsoleConsumer.ConsumerConfig(args)
+    assertTrue(config.formatter.isInstanceOf[DefaultMessageFormatter])
+    val formatter = config.formatter.asInstanceOf[DefaultMessageFormatter]
+    assertTrue(formatter.keyDeserializer.get.isInstanceOf[ByteArrayDeserializer])
+    assertTrue(config.formatterArgs.containsKey("my-test1"))
+    assertTrue(config.formatterArgs.containsKey(ConsoleConsumer.innerKeySerdeName))
+  }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 4527c19..fc673d0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -16,12 +16,14 @@
  */
 package org.apache.kafka.streams.integration;
 
+import kafka.tools.ConsoleConsumer;
 import kafka.utils.MockTime;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -43,10 +45,14 @@ import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.Serialized;
+import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
 import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.WindowedSerdes;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlySessionStore;
@@ -60,14 +66,18 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.PrintStream;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -75,6 +85,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 @Category({IntegrationTest.class})
 public class KStreamAggregationIntegrationTest {
@@ -205,32 +216,36 @@ public class KStreamAggregationIntegrationTest {
         produceMessages(secondBatchTimestamp);
         produceMessages(secondBatchTimestamp);
 
+        Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class);
         groupedStream
                 .windowedBy(TimeWindows.of(500L))
                 .reduce(reducer)
-                .toStream(new KeyValueMapper<Windowed<String>, String, String>() {
-                    @Override
-                    public String apply(final Windowed<String> windowedKey, final String value) {
-                        return windowedKey.key() + "@" + windowedKey.window().start();
-                    }
-                })
-            .to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
-
+                .toStream()
+                .to(outputTopic, Produced.with(windowedSerde, Serdes.String()));
 
         startStreams();
 
-        final List<KeyValue<String, String>> windowedOutput = receiveMessages(
-            new StringDeserializer(),
+        final List<KeyValue<Windowed<String>, String>> windowedOutput = receiveMessages(
+            new TimeWindowedDeserializer<String>(),
             new StringDeserializer(),
+            String.class,
             15);
 
-        final Comparator<KeyValue<String, String>>
+        // read from ConsoleConsumer
+        String resultFromConsoleConsumer = readWindowedKeyedMessagesViaConsoleConsumer(
+                new TimeWindowedDeserializer<String>(),
+                new StringDeserializer(),
+                String.class,
+                15);
+
+        final Comparator<KeyValue<Windowed<String>, String>>
             comparator =
-            new Comparator<KeyValue<String, String>>() {
+            new Comparator<KeyValue<Windowed<String>, String>>() {
                 @Override
-                public int compare(final KeyValue<String, String> o1,
-                                   final KeyValue<String, String> o2) {
-                    return KStreamAggregationIntegrationTest.compare(o1, o2);
+                public int compare(final KeyValue<Windowed<String>, String> o1,
+                                   final KeyValue<Windowed<String>, String> o2) {
+                    final int keyComparison = o1.key.key().compareTo(o2.key.key());
+                    return keyComparison == 0 ? o1.value.compareTo(o2.value) : keyComparison;
                 }
             };
 
@@ -238,25 +253,36 @@ public class KStreamAggregationIntegrationTest {
         final long firstBatchWindow = firstBatchTimestamp / 500 * 500;
         final long secondBatchWindow = secondBatchTimestamp / 500 * 500;
 
-        assertThat(windowedOutput, is(
-            Arrays.asList(
-                new KeyValue<>("A@" + firstBatchWindow, "A"),
-                new KeyValue<>("A@" + secondBatchWindow, "A"),
-                new KeyValue<>("A@" + secondBatchWindow, "A:A"),
-                new KeyValue<>("B@" + firstBatchWindow, "B"),
-                new KeyValue<>("B@" + secondBatchWindow, "B"),
-                new KeyValue<>("B@" + secondBatchWindow, "B:B"),
-                new KeyValue<>("C@" + firstBatchWindow, "C"),
-                new KeyValue<>("C@" + secondBatchWindow, "C"),
-                new KeyValue<>("C@" + secondBatchWindow, "C:C"),
-                new KeyValue<>("D@" + firstBatchWindow, "D"),
-                new KeyValue<>("D@" + secondBatchWindow, "D"),
-                new KeyValue<>("D@" + secondBatchWindow, "D:D"),
-                new KeyValue<>("E@" + firstBatchWindow, "E"),
-                new KeyValue<>("E@" + secondBatchWindow, "E"),
-                new KeyValue<>("E@" + secondBatchWindow, "E:E")
-            )
-        ));
+        List<KeyValue<Windowed<String>, String>> expectResult = Arrays.asList(
+                new KeyValue<>(new Windowed<>("A", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "A"),
+                new KeyValue<>(new Windowed<>("A", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "A"),
+                new KeyValue<>(new Windowed<>("A", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "A:A"),
+                new KeyValue<>(new Windowed<>("B", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "B"),
+                new KeyValue<>(new Windowed<>("B", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "B"),
+                new KeyValue<>(new Windowed<>("B", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "B:B"),
+                new KeyValue<>(new Windowed<>("C", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "C"),
+                new KeyValue<>(new Windowed<>("C", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "C"),
+                new KeyValue<>(new Windowed<>("C", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "C:C"),
+                new KeyValue<>(new Windowed<>("D", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "D"),
+                new KeyValue<>(new Windowed<>("D", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "D"),
+                new KeyValue<>(new Windowed<>("D", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "D:D"),
+                new KeyValue<>(new Windowed<>("E", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "E"),
+                new KeyValue<>(new Windowed<>("E", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "E"),
+                new KeyValue<>(new Windowed<>("E", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "E:E")
+        );
+        assertThat(windowedOutput, is(expectResult));
+
+        Set<String> expectResultString = new HashSet<>(expectResult.size());
+        for (KeyValue<Windowed<String>, String> eachRecord: expectResult) {
+            expectResultString.add(eachRecord.toString());
+        }
+
+        // check every message is contained in the expect result
+        String[] allRecords = resultFromConsoleConsumer.split("\n");
+        for (String record: allRecords) {
+            record = "KeyValue(" + record + ")";
+            assertTrue(expectResultString.contains(record));
+        }
     }
 
     @SuppressWarnings("deprecation")
@@ -309,34 +335,39 @@ public class KStreamAggregationIntegrationTest {
         produceMessages(secondTimestamp);
         produceMessages(secondTimestamp);
 
+        Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class);
         groupedStream.windowedBy(TimeWindows.of(500L))
                 .aggregate(
                         initializer,
                         aggregator,
                         Materialized.<String, Integer, WindowStore<Bytes, byte[]>>with(null, Serdes.Integer())
                 )
-                .toStream(new KeyValueMapper<Windowed<String>, Integer, String>() {
-                    @Override
-                    public String apply(final Windowed<String> windowedKey, final Integer value) {
-                        return windowedKey.key() + "@" + windowedKey.window().start();
-                    }
-                })
-                .to(outputTopic, Produced.with(Serdes.String(), Serdes.Integer()));
+                .toStream()
+                .to(outputTopic, Produced.with(windowedSerde, Serdes.Integer()));
 
         startStreams();
 
-        final List<KeyValue<String, Integer>> windowedMessages = receiveMessages(
-            new StringDeserializer(),
+        final List<KeyValue<Windowed<String>, Integer>> windowedMessages = receiveMessages(
+            new TimeWindowedDeserializer<String>(),
             new IntegerDeserializer(),
+            String.class,
             15);
 
-        final Comparator<KeyValue<String, Integer>>
+        // read from ConsoleConsumer
+        String resultFromConsoleConsumer = readWindowedKeyedMessagesViaConsoleConsumer(
+                new TimeWindowedDeserializer<String>(),
+                new IntegerDeserializer(),
+                String.class,
+                15);
+
+        final Comparator<KeyValue<Windowed<String>, Integer>>
             comparator =
-            new Comparator<KeyValue<String, Integer>>() {
+            new Comparator<KeyValue<Windowed<String>, Integer>>() {
                 @Override
-                public int compare(final KeyValue<String, Integer> o1,
-                                   final KeyValue<String, Integer> o2) {
-                    return KStreamAggregationIntegrationTest.compare(o1, o2);
+                public int compare(final KeyValue<Windowed<String>, Integer> o1,
+                                   final KeyValue<Windowed<String>, Integer> o2) {
+                    final int keyComparison = o1.key.key().compareTo(o2.key.key());
+                    return keyComparison == 0 ? o1.value.compareTo(o2.value) : keyComparison;
                 }
             };
 
@@ -345,24 +376,37 @@ public class KStreamAggregationIntegrationTest {
         final long firstWindow = firstTimestamp / 500 * 500;
         final long secondWindow = secondTimestamp / 500 * 500;
 
-        assertThat(windowedMessages, is(
-            Arrays.asList(
-                new KeyValue<>("A@" + firstWindow, 1),
-                new KeyValue<>("A@" + secondWindow, 1),
-                new KeyValue<>("A@" + secondWindow, 2),
-                new KeyValue<>("B@" + firstWindow, 1),
-                new KeyValue<>("B@" + secondWindow, 1),
-                new KeyValue<>("B@" + secondWindow, 2),
-                new KeyValue<>("C@" + firstWindow, 1),
-                new KeyValue<>("C@" + secondWindow, 1),
-                new KeyValue<>("C@" + secondWindow, 2),
-                new KeyValue<>("D@" + firstWindow, 1),
-                new KeyValue<>("D@" + secondWindow, 1),
-                new KeyValue<>("D@" + secondWindow, 2),
-                new KeyValue<>("E@" + firstWindow, 1),
-                new KeyValue<>("E@" + secondWindow, 1),
-                new KeyValue<>("E@" + secondWindow, 2)
-            )));
+        List<KeyValue<Windowed<String>, Integer>> expectResult = Arrays.asList(
+                new KeyValue<>(new Windowed<>("A", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1),
+                new KeyValue<>(new Windowed<>("A", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1),
+                new KeyValue<>(new Windowed<>("A", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2),
+                new KeyValue<>(new Windowed<>("B", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1),
+                new KeyValue<>(new Windowed<>("B", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1),
+                new KeyValue<>(new Windowed<>("B", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2),
+                new KeyValue<>(new Windowed<>("C", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1),
+                new KeyValue<>(new Windowed<>("C", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1),
+                new KeyValue<>(new Windowed<>("C", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2),
+                new KeyValue<>(new Windowed<>("D", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1),
+                new KeyValue<>(new Windowed<>("D", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1),
+                new KeyValue<>(new Windowed<>("D", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2),
+                new KeyValue<>(new Windowed<>("E", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1),
+                new KeyValue<>(new Windowed<>("E", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1),
+                new KeyValue<>(new Windowed<>("E", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2));
+
+        assertThat(windowedMessages, is(expectResult));
+
+        Set<String> expectResultString = new HashSet<>(expectResult.size());
+        for (KeyValue<Windowed<String>, Integer> eachRecord: expectResult) {
+            expectResultString.add(eachRecord.toString());
+        }
+
+        // check every message is contained in the expect result
+        String[] allRecords = resultFromConsoleConsumer.split("\n");
+        for (String record: allRecords) {
+            record = "KeyValue(" + record + ")";
+            assertTrue(expectResultString.contains(record));
+        }
+
     }
 
     private void shouldCountHelper() throws Exception {
@@ -685,26 +729,66 @@ public class KStreamAggregationIntegrationTest {
         kafkaStreams.start();
     }
 
-
     private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
                                                             keyDeserializer,
                                                         final Deserializer<V>
                                                             valueDeserializer,
                                                         final int numMessages)
         throws InterruptedException {
+        return receiveMessages(keyDeserializer, valueDeserializer, null, numMessages);
+    }
+
+    private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
+                                                                keyDeserializer,
+                                                        final Deserializer<V>
+                                                                valueDeserializer,
+                                                        final Class innerClass,
+                                                        final int numMessages) throws InterruptedException {
         final Properties consumerProperties = new Properties();
-        consumerProperties
-            .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + testNo);
         consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
         consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
+        if (keyDeserializer instanceof TimeWindowedDeserializer || keyDeserializer instanceof SessionWindowedDeserializer) {
+            consumerProperties.setProperty(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS,
+                    Serdes.serdeFrom(innerClass).getClass().getName());
+        }
         return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-            consumerProperties,
-            outputTopic,
-            numMessages,
-            60 * 1000);
-
+                consumerProperties,
+                outputTopic,
+                numMessages,
+                60 * 1000);
     }
 
+    private <K, V> String readWindowedKeyedMessagesViaConsoleConsumer(final Deserializer<K> keyDeserializer,
+                                                  final Deserializer<V> valueDeserializer,
+                                                  final Class innerClass,
+                                                  final int numMessages) {
+        ByteArrayOutputStream newConsole = new ByteArrayOutputStream();
+        PrintStream originalStream = System.out;
+        try (PrintStream newStream = new PrintStream(newConsole)) {
+            System.setOut(newStream);
+
+            String keySeparator = ", ";
+            // manually construct the console consumer argument array
+            String[] args = new String[] {
+                "--bootstrap-server", CLUSTER.bootstrapServers(),
+                "--from-beginning",
+                "--property", "print.key=true",
+                "--topic", outputTopic,
+                "--max-messages", String.valueOf(numMessages),
+                "--property", "key.deserializer=" + keyDeserializer.getClass().getName(),
+                "--property", "value.deserializer=" + valueDeserializer.getClass().getName(),
+                "--property", "key.separator=" + keySeparator,
+                "--" + StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, Serdes.serdeFrom(innerClass).getClass().getName()
+            };
+
+            ConsoleConsumer.messageCount_$eq(0); //reset the message count
+            ConsoleConsumer.run(new ConsoleConsumer.ConsumerConfig(args));
+            newStream.flush();
+            System.setOut(originalStream);
+            return newConsole.toString();
+        }
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.