You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/04/12 16:35:42 UTC
[kafka] branch trunk updated: KAFKA-6592: ConsoleConsumer should
support WindowedSerdes (#4797)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4e35a2b KAFKA-6592: ConsoleConsumer should support WindowedSerdes (#4797)
4e35a2b is described below
commit 4e35a2bfb7f3b0437a27bb58b8cb39339f750064
Author: huxi <hu...@hotmail.com>
AuthorDate: Fri Apr 13 00:35:37 2018 +0800
KAFKA-6592: ConsoleConsumer should support WindowedSerdes (#4797)
Have Console consumer support TimeWindowedDeserializer/SessionWindowedDeserializer.
Reviewers: Guozhang Wang <wa...@gmail.com>
---
.../main/scala/kafka/tools/ConsoleConsumer.scala | 47 ++++-
.../unit/kafka/tools/ConsoleConsumerTest.scala | 19 ++
.../KStreamAggregationIntegrationTest.java | 232 ++++++++++++++-------
3 files changed, 221 insertions(+), 77 deletions(-)
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 24fa583..9df4fb4 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -37,6 +37,7 @@ import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, Deserializer}
import org.apache.kafka.common.utils.Utils
+import scala.collection.JavaConversions
import scala.collection.JavaConverters._
/**
@@ -45,6 +46,11 @@ import scala.collection.JavaConverters._
object ConsoleConsumer extends Logging {
var messageCount = 0
+ // Keep same names with StreamConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
+ // and StreamConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
+ // visible for testing
+ private[tools] val innerKeySerdeName = "default.windowed.key.serde.inner"
+ private[tools] val innerValueSerdeName = "default.windowed.value.serde.inner"
private val shutdownLatch = new CountDownLatch(1)
@@ -291,7 +297,17 @@ object ConsoleConsumer extends Logging {
.describedAs("class")
.ofType(classOf[String])
.defaultsTo(classOf[DefaultMessageFormatter].getName)
- val messageFormatterArgOpt = parser.accepts("property", "The properties to initialize the message formatter.")
+ val messageFormatterArgOpt = parser.accepts("property",
+ "The properties to initialize the message formatter. Default properties include:\n" +
+ "\tprint.timestamp=true|false\n" +
+ "\tprint.key=true|false\n" +
+ "\tprint.value=true|false\n" +
+ "\tkey.separator=<key.separator>\n" +
+ "\tline.separator=<line.separator>\n" +
+ "\tkey.deserializer=<key.deserializer>\n" +
+ "\tvalue.deserializer=<value.deserializer>\n" +
+ "\tdefault.windowed.key.serde.inner=<windowed.key.serde.inner>\n" +
+ "\tdefault.windowed.value.serde.inner=<windowed.value.serde.inner>")
.withRequiredArg
.describedAs("prop")
.ofType(classOf[String])
@@ -328,6 +344,18 @@ object ConsoleConsumer extends Logging {
.withRequiredArg
.describedAs("deserializer for values")
.ofType(classOf[String])
+ val innerKeyDeserializerOpt = parser.accepts(innerKeySerdeName,
+ "inner serde for key when windowed deserialzier is used; would be ignored otherwise. " +
+ "For example: org.apache.kafka.common.serialization.Serdes\\$StringSerde")
+ .withRequiredArg
+ .describedAs("inner serde for key")
+ .ofType(classOf[String])
+ val innerValueDeserializerOpt = parser.accepts(innerValueSerdeName,
+ "inner serde for value when windowed deserialzier is used; would be ignored otherwise. " +
+ "For example: org.apache.kafka.common.serialization.Serdes\\$StringSerde")
+ .withRequiredArg
+ .describedAs("inner serde for values")
+ .ofType(classOf[String])
val enableSystestEventsLoggingOpt = parser.accepts("enable-systest-events",
"Log lifecycle events of the consumer in addition to logging consumed " +
"messages. (This is specific for system tests.)")
@@ -372,6 +400,8 @@ object ConsoleConsumer extends Logging {
val bootstrapServer = options.valueOf(bootstrapServerOpt)
val keyDeserializer = options.valueOf(keyDeserializerOpt)
val valueDeserializer = options.valueOf(valueDeserializerOpt)
+ val innerKeyDeserializer = options.valueOf(innerKeyDeserializerOpt)
+ val innerValueDeserializer = options.valueOf(innerValueDeserializerOpt)
val isolationLevel = options.valueOf(isolationLevelOpt).toString
val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
@@ -381,6 +411,13 @@ object ConsoleConsumer extends Logging {
if (valueDeserializer != null && !valueDeserializer.isEmpty) {
formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer)
}
+ if (innerKeyDeserializer != null && !innerKeyDeserializer.isEmpty) {
+ formatterArgs.setProperty(innerKeySerdeName, innerKeyDeserializer)
+ }
+ if (innerValueDeserializer != null && !innerValueDeserializer.isEmpty) {
+ formatterArgs.setProperty(innerValueSerdeName, innerValueDeserializer)
+ }
+
formatter.init(formatterArgs)
if (useOldConsumer) {
@@ -521,11 +558,15 @@ class DefaultMessageFormatter extends MessageFormatter {
if (props.containsKey("line.separator"))
lineSeparator = props.getProperty("line.separator").getBytes(StandardCharsets.UTF_8)
// Note that `toString` will be called on the instance returned by `Deserializer.deserialize`
- if (props.containsKey("key.deserializer"))
+ if (props.containsKey("key.deserializer")) {
keyDeserializer = Some(Class.forName(props.getProperty("key.deserializer")).newInstance().asInstanceOf[Deserializer[_]])
+ keyDeserializer.get.configure(JavaConversions.propertiesAsScalaMap(props).asJava, true)
+ }
// Note that `toString` will be called on the instance returned by `Deserializer.deserialize`
- if (props.containsKey("value.deserializer"))
+ if (props.containsKey("value.deserializer")) {
valueDeserializer = Some(Class.forName(props.getProperty("value.deserializer")).newInstance().asInstanceOf[Deserializer[_]])
+ valueDeserializer.get.configure(JavaConversions.propertiesAsScalaMap(props).asJava, false)
+ }
}
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index 9ae8b96..f5195c3 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -25,6 +25,7 @@ import kafka.utils.{Exit, TestUtils}
import org.apache.kafka.clients.consumer.{ConsumerRecord, MockConsumer, OffsetResetStrategy}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.easymock.EasyMock
import org.junit.Assert._
import org.junit.{Before, Test}
@@ -537,4 +538,22 @@ class ConsoleConsumerTest {
Exit.resetExitProcedure()
}
+
+ @Test
+ def testCustomPropertyShouldBePassedToConfigureMethod(): Unit = {
+ val args = Array(
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--property", "print.key=true",
+ "--property", "key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer",
+ "--" + ConsoleConsumer.innerKeySerdeName, "org.apache.kafka.common.serialization.Serdes$StringSerde",
+ "--property", "my-test1=abc"
+ )
+ val config = new ConsoleConsumer.ConsumerConfig(args)
+ assertTrue(config.formatter.isInstanceOf[DefaultMessageFormatter])
+ val formatter = config.formatter.asInstanceOf[DefaultMessageFormatter]
+ assertTrue(formatter.keyDeserializer.get.isInstanceOf[ByteArrayDeserializer])
+ assertTrue(config.formatterArgs.containsKey("my-test1"))
+ assertTrue(config.formatterArgs.containsKey(ConsoleConsumer.innerKeySerdeName))
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 4527c19..fc673d0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -16,12 +16,14 @@
*/
package org.apache.kafka.streams.integration;
+import kafka.tools.ConsoleConsumer;
import kafka.utils.MockTime;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -43,10 +45,14 @@ import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.Serialized;
+import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.WindowedSerdes;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
@@ -60,14 +66,18 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.PrintStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -75,6 +85,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
@Category({IntegrationTest.class})
public class KStreamAggregationIntegrationTest {
@@ -205,32 +216,36 @@ public class KStreamAggregationIntegrationTest {
produceMessages(secondBatchTimestamp);
produceMessages(secondBatchTimestamp);
+ Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class);
groupedStream
.windowedBy(TimeWindows.of(500L))
.reduce(reducer)
- .toStream(new KeyValueMapper<Windowed<String>, String, String>() {
- @Override
- public String apply(final Windowed<String> windowedKey, final String value) {
- return windowedKey.key() + "@" + windowedKey.window().start();
- }
- })
- .to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
-
+ .toStream()
+ .to(outputTopic, Produced.with(windowedSerde, Serdes.String()));
startStreams();
- final List<KeyValue<String, String>> windowedOutput = receiveMessages(
- new StringDeserializer(),
+ final List<KeyValue<Windowed<String>, String>> windowedOutput = receiveMessages(
+ new TimeWindowedDeserializer<String>(),
new StringDeserializer(),
+ String.class,
15);
- final Comparator<KeyValue<String, String>>
+ // read from ConsoleConsumer
+ String resultFromConsoleConsumer = readWindowedKeyedMessagesViaConsoleConsumer(
+ new TimeWindowedDeserializer<String>(),
+ new StringDeserializer(),
+ String.class,
+ 15);
+
+ final Comparator<KeyValue<Windowed<String>, String>>
comparator =
- new Comparator<KeyValue<String, String>>() {
+ new Comparator<KeyValue<Windowed<String>, String>>() {
@Override
- public int compare(final KeyValue<String, String> o1,
- final KeyValue<String, String> o2) {
- return KStreamAggregationIntegrationTest.compare(o1, o2);
+ public int compare(final KeyValue<Windowed<String>, String> o1,
+ final KeyValue<Windowed<String>, String> o2) {
+ final int keyComparison = o1.key.key().compareTo(o2.key.key());
+ return keyComparison == 0 ? o1.value.compareTo(o2.value) : keyComparison;
}
};
@@ -238,25 +253,36 @@ public class KStreamAggregationIntegrationTest {
final long firstBatchWindow = firstBatchTimestamp / 500 * 500;
final long secondBatchWindow = secondBatchTimestamp / 500 * 500;
- assertThat(windowedOutput, is(
- Arrays.asList(
- new KeyValue<>("A@" + firstBatchWindow, "A"),
- new KeyValue<>("A@" + secondBatchWindow, "A"),
- new KeyValue<>("A@" + secondBatchWindow, "A:A"),
- new KeyValue<>("B@" + firstBatchWindow, "B"),
- new KeyValue<>("B@" + secondBatchWindow, "B"),
- new KeyValue<>("B@" + secondBatchWindow, "B:B"),
- new KeyValue<>("C@" + firstBatchWindow, "C"),
- new KeyValue<>("C@" + secondBatchWindow, "C"),
- new KeyValue<>("C@" + secondBatchWindow, "C:C"),
- new KeyValue<>("D@" + firstBatchWindow, "D"),
- new KeyValue<>("D@" + secondBatchWindow, "D"),
- new KeyValue<>("D@" + secondBatchWindow, "D:D"),
- new KeyValue<>("E@" + firstBatchWindow, "E"),
- new KeyValue<>("E@" + secondBatchWindow, "E"),
- new KeyValue<>("E@" + secondBatchWindow, "E:E")
- )
- ));
+ List<KeyValue<Windowed<String>, String>> expectResult = Arrays.asList(
+ new KeyValue<>(new Windowed<>("A", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "A"),
+ new KeyValue<>(new Windowed<>("A", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "A"),
+ new KeyValue<>(new Windowed<>("A", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "A:A"),
+ new KeyValue<>(new Windowed<>("B", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "B"),
+ new KeyValue<>(new Windowed<>("B", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "B"),
+ new KeyValue<>(new Windowed<>("B", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "B:B"),
+ new KeyValue<>(new Windowed<>("C", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "C"),
+ new KeyValue<>(new Windowed<>("C", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "C"),
+ new KeyValue<>(new Windowed<>("C", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "C:C"),
+ new KeyValue<>(new Windowed<>("D", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "D"),
+ new KeyValue<>(new Windowed<>("D", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "D"),
+ new KeyValue<>(new Windowed<>("D", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "D:D"),
+ new KeyValue<>(new Windowed<>("E", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "E"),
+ new KeyValue<>(new Windowed<>("E", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "E"),
+ new KeyValue<>(new Windowed<>("E", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "E:E")
+ );
+ assertThat(windowedOutput, is(expectResult));
+
+ Set<String> expectResultString = new HashSet<>(expectResult.size());
+ for (KeyValue<Windowed<String>, String> eachRecord: expectResult) {
+ expectResultString.add(eachRecord.toString());
+ }
+
+ // check every message is contained in the expect result
+ String[] allRecords = resultFromConsoleConsumer.split("\n");
+ for (String record: allRecords) {
+ record = "KeyValue(" + record + ")";
+ assertTrue(expectResultString.contains(record));
+ }
}
@SuppressWarnings("deprecation")
@@ -309,34 +335,39 @@ public class KStreamAggregationIntegrationTest {
produceMessages(secondTimestamp);
produceMessages(secondTimestamp);
+ Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class);
groupedStream.windowedBy(TimeWindows.of(500L))
.aggregate(
initializer,
aggregator,
Materialized.<String, Integer, WindowStore<Bytes, byte[]>>with(null, Serdes.Integer())
)
- .toStream(new KeyValueMapper<Windowed<String>, Integer, String>() {
- @Override
- public String apply(final Windowed<String> windowedKey, final Integer value) {
- return windowedKey.key() + "@" + windowedKey.window().start();
- }
- })
- .to(outputTopic, Produced.with(Serdes.String(), Serdes.Integer()));
+ .toStream()
+ .to(outputTopic, Produced.with(windowedSerde, Serdes.Integer()));
startStreams();
- final List<KeyValue<String, Integer>> windowedMessages = receiveMessages(
- new StringDeserializer(),
+ final List<KeyValue<Windowed<String>, Integer>> windowedMessages = receiveMessages(
+ new TimeWindowedDeserializer<String>(),
new IntegerDeserializer(),
+ String.class,
15);
- final Comparator<KeyValue<String, Integer>>
+ // read from ConsoleConsumer
+ String resultFromConsoleConsumer = readWindowedKeyedMessagesViaConsoleConsumer(
+ new TimeWindowedDeserializer<String>(),
+ new IntegerDeserializer(),
+ String.class,
+ 15);
+
+ final Comparator<KeyValue<Windowed<String>, Integer>>
comparator =
- new Comparator<KeyValue<String, Integer>>() {
+ new Comparator<KeyValue<Windowed<String>, Integer>>() {
@Override
- public int compare(final KeyValue<String, Integer> o1,
- final KeyValue<String, Integer> o2) {
- return KStreamAggregationIntegrationTest.compare(o1, o2);
+ public int compare(final KeyValue<Windowed<String>, Integer> o1,
+ final KeyValue<Windowed<String>, Integer> o2) {
+ final int keyComparison = o1.key.key().compareTo(o2.key.key());
+ return keyComparison == 0 ? o1.value.compareTo(o2.value) : keyComparison;
}
};
@@ -345,24 +376,37 @@ public class KStreamAggregationIntegrationTest {
final long firstWindow = firstTimestamp / 500 * 500;
final long secondWindow = secondTimestamp / 500 * 500;
- assertThat(windowedMessages, is(
- Arrays.asList(
- new KeyValue<>("A@" + firstWindow, 1),
- new KeyValue<>("A@" + secondWindow, 1),
- new KeyValue<>("A@" + secondWindow, 2),
- new KeyValue<>("B@" + firstWindow, 1),
- new KeyValue<>("B@" + secondWindow, 1),
- new KeyValue<>("B@" + secondWindow, 2),
- new KeyValue<>("C@" + firstWindow, 1),
- new KeyValue<>("C@" + secondWindow, 1),
- new KeyValue<>("C@" + secondWindow, 2),
- new KeyValue<>("D@" + firstWindow, 1),
- new KeyValue<>("D@" + secondWindow, 1),
- new KeyValue<>("D@" + secondWindow, 2),
- new KeyValue<>("E@" + firstWindow, 1),
- new KeyValue<>("E@" + secondWindow, 1),
- new KeyValue<>("E@" + secondWindow, 2)
- )));
+ List<KeyValue<Windowed<String>, Integer>> expectResult = Arrays.asList(
+ new KeyValue<>(new Windowed<>("A", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1),
+ new KeyValue<>(new Windowed<>("A", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1),
+ new KeyValue<>(new Windowed<>("A", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2),
+ new KeyValue<>(new Windowed<>("B", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1),
+ new KeyValue<>(new Windowed<>("B", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1),
+ new KeyValue<>(new Windowed<>("B", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2),
+ new KeyValue<>(new Windowed<>("C", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1),
+ new KeyValue<>(new Windowed<>("C", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1),
+ new KeyValue<>(new Windowed<>("C", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2),
+ new KeyValue<>(new Windowed<>("D", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1),
+ new KeyValue<>(new Windowed<>("D", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1),
+ new KeyValue<>(new Windowed<>("D", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2),
+ new KeyValue<>(new Windowed<>("E", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1),
+ new KeyValue<>(new Windowed<>("E", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1),
+ new KeyValue<>(new Windowed<>("E", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2));
+
+ assertThat(windowedMessages, is(expectResult));
+
+ Set<String> expectResultString = new HashSet<>(expectResult.size());
+ for (KeyValue<Windowed<String>, Integer> eachRecord: expectResult) {
+ expectResultString.add(eachRecord.toString());
+ }
+
+ // check every message is contained in the expect result
+ String[] allRecords = resultFromConsoleConsumer.split("\n");
+ for (String record: allRecords) {
+ record = "KeyValue(" + record + ")";
+ assertTrue(expectResultString.contains(record));
+ }
+
}
private void shouldCountHelper() throws Exception {
@@ -685,26 +729,66 @@ public class KStreamAggregationIntegrationTest {
kafkaStreams.start();
}
-
private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
keyDeserializer,
final Deserializer<V>
valueDeserializer,
final int numMessages)
throws InterruptedException {
+ return receiveMessages(keyDeserializer, valueDeserializer, null, numMessages);
+ }
+
+ private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
+ keyDeserializer,
+ final Deserializer<V>
+ valueDeserializer,
+ final Class innerClass,
+ final int numMessages) throws InterruptedException {
final Properties consumerProperties = new Properties();
- consumerProperties
- .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + testNo);
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
+ if (keyDeserializer instanceof TimeWindowedDeserializer || keyDeserializer instanceof SessionWindowedDeserializer) {
+ consumerProperties.setProperty(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS,
+ Serdes.serdeFrom(innerClass).getClass().getName());
+ }
return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
- consumerProperties,
- outputTopic,
- numMessages,
- 60 * 1000);
-
+ consumerProperties,
+ outputTopic,
+ numMessages,
+ 60 * 1000);
}
+ private <K, V> String readWindowedKeyedMessagesViaConsoleConsumer(final Deserializer<K> keyDeserializer,
+ final Deserializer<V> valueDeserializer,
+ final Class innerClass,
+ final int numMessages) {
+ ByteArrayOutputStream newConsole = new ByteArrayOutputStream();
+ PrintStream originalStream = System.out;
+ try (PrintStream newStream = new PrintStream(newConsole)) {
+ System.setOut(newStream);
+
+ String keySeparator = ", ";
+ // manually construct the console consumer argument array
+ String[] args = new String[] {
+ "--bootstrap-server", CLUSTER.bootstrapServers(),
+ "--from-beginning",
+ "--property", "print.key=true",
+ "--topic", outputTopic,
+ "--max-messages", String.valueOf(numMessages),
+ "--property", "key.deserializer=" + keyDeserializer.getClass().getName(),
+ "--property", "value.deserializer=" + valueDeserializer.getClass().getName(),
+ "--property", "key.separator=" + keySeparator,
+ "--" + StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, Serdes.serdeFrom(innerClass).getClass().getName()
+ };
+
+ ConsoleConsumer.messageCount_$eq(0); //reset the message count
+ ConsoleConsumer.run(new ConsoleConsumer.ConsumerConfig(args));
+ newStream.flush();
+ System.setOut(originalStream);
+ return newConsole.toString();
+ }
+ }
}
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.