You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/07/11 20:57:07 UTC
[2/2] kafka git commit: KAFKA-3887 Follow-up: add unit test for null
checking in KTable aggregates
KAFKA-3887 Follow-up: add unit test for null checking in KTable aggregates
Also made a pass over the streams unit tests, with the following changes:
1. Removed three integration tests as they are already covered by other integration tests.
2. Merged `KGroupedTableImplTest` into `KTableAggregateTest`.
3. Use mocks whenever possible to reduce code duplicates.
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Damian Guy <da...@gmail.com>, Ismael Juma <is...@juma.me.uk>
Closes #1604 from guozhangwang/Kminor-unit-tests-consolidation
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/136a8fab
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/136a8fab
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/136a8fab
Branch: refs/heads/trunk
Commit: 136a8fabca8e266f67897cf5471b2e41c0a341be
Parents: c439268
Author: Guozhang Wang <wa...@gmail.com>
Authored: Mon Jul 11 13:57:02 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Jul 11 13:57:02 2016 -0700
----------------------------------------------------------------------
.../InternalTopicIntegrationTest.java | 14 +-
.../integration/JoinIntegrationTest.java | 259 ----------
.../KGroupedStreamIntegrationTest.java | 472 -------------------
.../KStreamAggregationIntegrationTest.java | 466 ++++++++++++++++++
.../KStreamKTableJoinIntegrationTest.java | 258 ++++++++++
.../integration/KStreamRepartitionJoinTest.java | 69 +--
.../integration/MapFunctionIntegrationTest.java | 122 -----
.../integration/PassThroughIntegrationTest.java | 108 -----
.../integration/WordCountIntegrationTest.java | 154 ------
.../internals/KGroupedTableImplTest.java | 79 ----
.../kstream/internals/KTableAggregateTest.java | 57 ++-
.../kstream/internals/KTableSourceTest.java | 4 +-
.../apache/kafka/test/MockKeyValueMapper.java | 23 +-
13 files changed, 808 insertions(+), 1277 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/136a8fab/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 15469c7..968e060 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -24,13 +24,15 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.TestUtils;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -41,8 +43,6 @@ import java.util.List;
import java.util.Locale;
import java.util.Properties;
-import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import kafka.admin.AdminUtils;
import kafka.log.LogConfig;
import kafka.utils.ZKStringSerializer$;
@@ -135,12 +135,8 @@ public class InternalTopicIntegrationTest {
public Iterable<String> apply(String value) {
return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
}
- }).groupBy(new KeyValueMapper<String, String, String>() {
- @Override
- public String apply(String key, String value) {
- return value;
- }
- }).count("Counts").toStream();
+ }).groupBy(MockKeyValueMapper.<String, String>SelectValueMapper())
+ .count("Counts").toStream();
wordCounts.to(stringSerde, longSerde, DEFAULT_OUTPUT_TOPIC);
http://git-wip-us.apache.org/repos/asf/kafka/blob/136a8fab/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
deleted file mode 100644
index f99a142..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.kafka.streams.integration;
-
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.Reducer;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.test.TestUtils;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.junit.Assert.assertThat;
-
-/**
- * End-to-end integration test that demonstrates how to perform a join between a KStream and a
- * KTable (think: KStream.leftJoin(KTable)), i.e. an example of a stateful computation.
- */
-public class JoinIntegrationTest {
- @ClassRule
- public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
- private static final String USER_CLICKS_TOPIC = "user-clicks";
- private static final String USER_REGIONS_TOPIC = "user-regions";
- private static final String OUTPUT_TOPIC = "output-topic";
-
- @BeforeClass
- public static void startKafkaCluster() throws Exception {
- CLUSTER.createTopic(USER_CLICKS_TOPIC);
- CLUSTER.createTopic(USER_REGIONS_TOPIC);
- CLUSTER.createTopic(OUTPUT_TOPIC);
- }
-
- /**
- * Tuple for a region and its associated number of clicks.
- */
- private static final class RegionWithClicks {
-
- private final String region;
- private final long clicks;
-
- public RegionWithClicks(String region, long clicks) {
- if (region == null || region.isEmpty()) {
- throw new IllegalArgumentException("region must be set");
- }
- if (clicks < 0) {
- throw new IllegalArgumentException("clicks must not be negative");
- }
- this.region = region;
- this.clicks = clicks;
- }
-
- public String getRegion() {
- return region;
- }
-
- public long getClicks() {
- return clicks;
- }
-
- }
-
- @Test
- public void shouldCountClicksPerRegion() throws Exception {
- // Input 1: Clicks per user (multiple records allowed per user).
- List<KeyValue<String, Long>> userClicks = Arrays.asList(
- new KeyValue<>("alice", 13L),
- new KeyValue<>("bob", 4L),
- new KeyValue<>("chao", 25L),
- new KeyValue<>("bob", 19L),
- new KeyValue<>("dave", 56L),
- new KeyValue<>("eve", 78L),
- new KeyValue<>("alice", 40L),
- new KeyValue<>("fang", 99L)
- );
-
- // Input 2: Region per user (multiple records allowed per user).
- List<KeyValue<String, String>> userRegions = Arrays.asList(
- new KeyValue<>("alice", "asia"), /* Alice lived in Asia originally... */
- new KeyValue<>("bob", "americas"),
- new KeyValue<>("chao", "asia"),
- new KeyValue<>("dave", "europe"),
- new KeyValue<>("alice", "europe"), /* ...but moved to Europe some time later. */
- new KeyValue<>("eve", "americas"),
- new KeyValue<>("fang", "asia")
- );
-
- List<KeyValue<String, Long>> expectedClicksPerRegion = Arrays.asList(
- new KeyValue<>("europe", 13L),
- new KeyValue<>("americas", 4L),
- new KeyValue<>("asia", 25L),
- new KeyValue<>("americas", 23L),
- new KeyValue<>("europe", 69L),
- new KeyValue<>("americas", 101L),
- new KeyValue<>("europe", 109L),
- new KeyValue<>("asia", 124L)
- );
-
- //
- // Step 1: Configure and start the processor topology.
- //
- final Serde<String> stringSerde = Serdes.String();
- final Serde<Long> longSerde = Serdes.Long();
-
- Properties streamsConfiguration = new Properties();
- streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "join-integration-test");
- streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
- streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
- streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
- TestUtils.tempDirectory().getPath());
-
- // Remove any state from previous test runs
- IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
-
- KStreamBuilder builder = new KStreamBuilder();
-
- // This KStream contains information such as "alice" -> 13L.
- //
- // Because this is a KStream ("record stream"), multiple records for the same user will be
- // considered as separate click-count events, each of which will be added to the total count.
- KStream<String, Long> userClicksStream = builder.stream(stringSerde, longSerde, USER_CLICKS_TOPIC);
-
- // This KTable contains information such as "alice" -> "europe".
- //
- // Because this is a KTable ("changelog stream"), only the latest value (here: region) for a
- // record key will be considered at the time when a new user-click record (see above) is
- // received for the `leftJoin` below. Any previous region values are being considered out of
- // date. This behavior is quite different to the KStream for user clicks above.
- //
- // For example, the user "alice" will be considered to live in "europe" (although originally she
- // lived in "asia") because, at the time her first user-click record is being received and
- // subsequently processed in the `leftJoin`, the latest region update for "alice" is "europe"
- // (which overrides her previous region value of "asia").
- KTable<String, String> userRegionsTable =
- builder.table(stringSerde, stringSerde, USER_REGIONS_TOPIC);
-
- // Compute the number of clicks per region, e.g. "europe" -> 13L.
- //
- // The resulting KTable is continuously being updated as new data records are arriving in the
- // input KStream `userClicksStream` and input KTable `userRegionsTable`.
- KTable<String, Long> clicksPerRegion = userClicksStream
- // Join the stream against the table.
- //
- // Null values possible: In general, null values are possible for region (i.e. the value of
- // the KTable we are joining against) so we must guard against that (here: by setting the
- // fallback region "UNKNOWN"). In this specific example this is not really needed because
- // we know, based on the test setup, that all users have appropriate region entries at the
- // time we perform the join.
- //
- // Also, we need to return a tuple of (region, clicks) for each user. But because Java does
- // not support tuples out-of-the-box, we must use a custom class `RegionWithClicks` to
- // achieve the same effect.
- .leftJoin(userRegionsTable, new ValueJoiner<Long, String, RegionWithClicks>() {
- @Override
- public RegionWithClicks apply(Long clicks, String region) {
- RegionWithClicks regionWithClicks = new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks);
- return regionWithClicks;
- }
- })
- // Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
- .map(new KeyValueMapper<String, RegionWithClicks, KeyValue<String, Long>>() {
- @Override
- public KeyValue<String, Long> apply(String key, RegionWithClicks value) {
- return new KeyValue<>(value.getRegion(), value.getClicks());
- }
- })
- // Compute the total per region by summing the individual click counts per region.
- .groupByKey(stringSerde, longSerde)
- .reduce(new Reducer<Long>() {
- @Override
- public Long apply(Long value1, Long value2) {
- return value1 + value2;
- }
- }, "ClicksPerRegionUnwindowed");
-
- // Write the (continuously updating) results to the output topic.
- clicksPerRegion.to(stringSerde, longSerde, OUTPUT_TOPIC);
-
- KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
- streams.start();
-
- //
- // Step 2: Publish user-region information.
- //
- // To keep this code example simple and easier to understand/reason about, we publish all
- // user-region records before any user-click records (cf. step 3). In practice though,
- // data records would typically be arriving concurrently in both input streams/topics.
- Properties userRegionsProducerConfig = new Properties();
- userRegionsProducerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
- userRegionsProducerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
- userRegionsProducerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
- userRegionsProducerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- userRegionsProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- IntegrationTestUtils.produceKeyValuesSynchronously(USER_REGIONS_TOPIC, userRegions, userRegionsProducerConfig);
-
- //
- // Step 3: Publish some user click events.
- //
- Properties userClicksProducerConfig = new Properties();
- userClicksProducerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
- userClicksProducerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
- userClicksProducerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
- userClicksProducerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- userClicksProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
- IntegrationTestUtils.produceKeyValuesSynchronously(USER_CLICKS_TOPIC, userClicks, userClicksProducerConfig);
-
- //
- // Step 4: Verify the application's output data.
- //
- Properties consumerConfig = new Properties();
- consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
- consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "join-integration-test-standard-consumer");
- consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
- List<KeyValue<String, Long>> actualClicksPerRegion = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig,
- OUTPUT_TOPIC, expectedClicksPerRegion.size());
- streams.close();
- assertThat(actualClicksPerRegion, equalTo(expectedClicksPerRegion));
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/136a8fab/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java
deleted file mode 100644
index 36340b9..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java
+++ /dev/null
@@ -1,472 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at <p> http://www.apache.org/licenses/LICENSE-2.0 <p> Unless required by
- * applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
- * the License for the specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.streams.integration;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.kstream.Initializer;
-import org.apache.kafka.streams.kstream.KGroupedStream;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.Reducer;
-import org.apache.kafka.streams.kstream.TimeWindows;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.Is.is;
-
-public class KGroupedStreamIntegrationTest {
-
- @ClassRule
- public static final EmbeddedSingleNodeKafkaCluster CLUSTER =
- new EmbeddedSingleNodeKafkaCluster();
- private static volatile int testNo = 0;
- private KStreamBuilder builder;
- private Properties streamsConfiguration;
- private KafkaStreams kafkaStreams;
- private String streamOneInput;
- private String outputTopic;
- private KGroupedStream<String, String> groupedStream;
- private Reducer<String> reducer;
- private Initializer<Integer> initializer;
- private Aggregator<String, String, Integer> aggregator;
- private KStream<Integer, String> stream;
-
-
- @Before
- public void before() {
- testNo++;
- builder = new KStreamBuilder();
- createTopics();
- streamsConfiguration = new Properties();
- String applicationId = "kgrouped-stream-test-" +
- testNo;
- streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
- streamsConfiguration
- .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
- streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
- streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
-
- KeyValueMapper<Integer, String, String>
- mapper =
- new KeyValueMapper<Integer, String, String>() {
- @Override
- public String apply(Integer key, String value) {
- return value;
- }
- };
- stream = builder.stream(Serdes.Integer(), Serdes.String(), streamOneInput);
- groupedStream = stream
- .groupBy(
- mapper,
- Serdes.String(),
- Serdes.String());
-
- reducer = new Reducer<String>() {
- @Override
- public String apply(String value1, String value2) {
- return value1 + ":" + value2;
- }
- };
- initializer = new Initializer<Integer>() {
- @Override
- public Integer apply() {
- return 0;
- }
- };
- aggregator = new Aggregator<String, String, Integer>() {
- @Override
- public Integer apply(String aggKey, String value, Integer aggregate) {
- return aggregate + value.length();
- }
- };
- }
-
- @After
- public void whenShuttingDown() throws IOException {
- if (kafkaStreams != null) {
- kafkaStreams.close();
- }
- IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
- }
-
-
- @Test
- public void shouldReduce() throws Exception {
- produceMessages(System.currentTimeMillis());
- groupedStream
- .reduce(reducer, "reduce-by-key")
- .to(Serdes.String(), Serdes.String(), outputTopic);
-
- startStreams();
-
- produceMessages(System.currentTimeMillis());
-
- List<KeyValue<String, String>> results = receiveMessages(
- new StringDeserializer(),
- new StringDeserializer()
- , 10);
-
- Collections.sort(results, new Comparator<KeyValue<String, String>>() {
- @Override
- public int compare(KeyValue<String, String> o1, KeyValue<String, String> o2) {
- return KGroupedStreamIntegrationTest.compare(o1, o2);
- }
- });
-
- assertThat(results, is(Arrays.asList(KeyValue.pair("A", "A"),
- KeyValue.pair("A", "A:A"),
- KeyValue.pair("B", "B"),
- KeyValue.pair("B", "B:B"),
- KeyValue.pair("C", "C"),
- KeyValue.pair("C", "C:C"),
- KeyValue.pair("D", "D"),
- KeyValue.pair("D", "D:D"),
- KeyValue.pair("E", "E"),
- KeyValue.pair("E", "E:E"))));
- }
-
- @SuppressWarnings("unchecked")
- private static <K extends Comparable, V extends Comparable> int compare(final KeyValue<K, V> o1,
- final KeyValue<K, V> o2) {
- final int keyComparison = o1.key.compareTo(o2.key);
- if (keyComparison == 0) {
- return o1.value.compareTo(o2.value);
- }
- return keyComparison;
- }
-
- @Test
- public void shouldReduceWindowed() throws Exception {
- long firstBatchTimestamp = System.currentTimeMillis() - 1000;
- produceMessages(firstBatchTimestamp);
- long secondBatchTimestamp = System.currentTimeMillis();
- produceMessages(secondBatchTimestamp);
- produceMessages(secondBatchTimestamp);
-
- groupedStream
- .reduce(reducer, TimeWindows.of("reduce-time-windows", 500L))
- .toStream(new KeyValueMapper<Windowed<String>, String, String>() {
- @Override
- public String apply(Windowed<String> windowedKey, String value) {
- return windowedKey.key() + "@" + windowedKey.window().start();
- }
- })
- .to(Serdes.String(), Serdes.String(), outputTopic);
-
- startStreams();
-
- List<KeyValue<String, String>> windowedOutput = receiveMessages(
- new StringDeserializer(),
- new StringDeserializer()
- , 15);
-
- Comparator<KeyValue<String, String>>
- comparator =
- new Comparator<KeyValue<String, String>>() {
- @Override
- public int compare(final KeyValue<String, String> o1,
- final KeyValue<String, String> o2) {
- return KGroupedStreamIntegrationTest.compare(o1, o2);
- }
- };
-
- Collections.sort(windowedOutput, comparator);
- long firstBatchWindow = firstBatchTimestamp / 500 * 500;
- long secondBatchWindow = secondBatchTimestamp / 500 * 500;
-
- assertThat(windowedOutput, is(
- Arrays.asList(
- new KeyValue<>("A@" + firstBatchWindow, "A"),
- new KeyValue<>("A@" + secondBatchWindow, "A"),
- new KeyValue<>("A@" + secondBatchWindow, "A:A"),
- new KeyValue<>("B@" + firstBatchWindow, "B"),
- new KeyValue<>("B@" + secondBatchWindow, "B"),
- new KeyValue<>("B@" + secondBatchWindow, "B:B"),
- new KeyValue<>("C@" + firstBatchWindow, "C"),
- new KeyValue<>("C@" + secondBatchWindow, "C"),
- new KeyValue<>("C@" + secondBatchWindow, "C:C"),
- new KeyValue<>("D@" + firstBatchWindow, "D"),
- new KeyValue<>("D@" + secondBatchWindow, "D"),
- new KeyValue<>("D@" + secondBatchWindow, "D:D"),
- new KeyValue<>("E@" + firstBatchWindow, "E"),
- new KeyValue<>("E@" + secondBatchWindow, "E"),
- new KeyValue<>("E@" + secondBatchWindow, "E:E")
- )
- ));
- }
-
- @Test
- public void shouldAggregate() throws Exception {
- produceMessages(System.currentTimeMillis());
- groupedStream.aggregate(
- initializer,
- aggregator,
- Serdes.Integer(),
- "aggregate-by-selected-key")
- .to(Serdes.String(), Serdes.Integer(), outputTopic);
-
- startStreams();
-
- produceMessages(System.currentTimeMillis());
-
- List<KeyValue<String, Integer>> results = receiveMessages(
- new StringDeserializer(),
- new IntegerDeserializer()
- , 10);
-
- Collections.sort(results, new Comparator<KeyValue<String, Integer>>() {
- @Override
- public int compare(KeyValue<String, Integer> o1, KeyValue<String, Integer> o2) {
- return KGroupedStreamIntegrationTest.compare(o1, o2);
- }
- });
-
- assertThat(results, is(Arrays.asList(
- KeyValue.pair("A", 1),
- KeyValue.pair("A", 2),
- KeyValue.pair("B", 1),
- KeyValue.pair("B", 2),
- KeyValue.pair("C", 1),
- KeyValue.pair("C", 2),
- KeyValue.pair("D", 1),
- KeyValue.pair("D", 2),
- KeyValue.pair("E", 1),
- KeyValue.pair("E", 2)
- )));
- }
-
- @Test
- public void shouldAggregateWindowed() throws Exception {
- long firstTimestamp = System.currentTimeMillis() - 1000;
- produceMessages(firstTimestamp);
- long secondTimestamp = System.currentTimeMillis();
- produceMessages(secondTimestamp);
- produceMessages(secondTimestamp);
-
- groupedStream.aggregate(
- initializer,
- aggregator,
- TimeWindows.of("aggregate-by-key-windowed", 500L),
- Serdes.Integer())
- .toStream(new KeyValueMapper<Windowed<String>, Integer, String>() {
- @Override
- public String apply(Windowed<String> windowedKey, Integer value) {
- return windowedKey.key() + "@" + windowedKey.window().start();
- }
- })
- .to(Serdes.String(), Serdes.Integer(), outputTopic);
-
- startStreams();
-
- List<KeyValue<String, Integer>> windowedMessages = receiveMessages(
- new StringDeserializer(),
- new IntegerDeserializer()
- , 15);
-
- Comparator<KeyValue<String, Integer>>
- comparator =
- new Comparator<KeyValue<String, Integer>>() {
- @Override
- public int compare(final KeyValue<String, Integer> o1,
- final KeyValue<String, Integer> o2) {
- return KGroupedStreamIntegrationTest.compare(o1, o2);
- }
- };
-
- Collections.sort(windowedMessages, comparator);
-
- long firstWindow = firstTimestamp / 500 * 500;
- long secondWindow = secondTimestamp / 500 * 500;
-
- assertThat(windowedMessages, is(
- Arrays.asList(
- new KeyValue<>("A@" + firstWindow, 1),
- new KeyValue<>("A@" + secondWindow, 1),
- new KeyValue<>("A@" + secondWindow, 2),
- new KeyValue<>("B@" + firstWindow, 1),
- new KeyValue<>("B@" + secondWindow, 1),
- new KeyValue<>("B@" + secondWindow, 2),
- new KeyValue<>("C@" + firstWindow, 1),
- new KeyValue<>("C@" + secondWindow, 1),
- new KeyValue<>("C@" + secondWindow, 2),
- new KeyValue<>("D@" + firstWindow, 1),
- new KeyValue<>("D@" + secondWindow, 1),
- new KeyValue<>("D@" + secondWindow, 2),
- new KeyValue<>("E@" + firstWindow, 1),
- new KeyValue<>("E@" + secondWindow, 1),
- new KeyValue<>("E@" + secondWindow, 2)
- )));
- }
-
- @Test
- public void shouldCount() throws Exception {
- produceMessages(System.currentTimeMillis());
-
- groupedStream.count("count-by-key")
- .to(Serdes.String(), Serdes.Long(), outputTopic);
-
- startStreams();
-
- produceMessages(System.currentTimeMillis());
-
- List<KeyValue<String, Long>> results = receiveMessages(
- new StringDeserializer(),
- new LongDeserializer()
- , 10);
- Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
- @Override
- public int compare(KeyValue<String, Long> o1, KeyValue<String, Long> o2) {
- return KGroupedStreamIntegrationTest.compare(o1, o2);
- }
- });
-
- assertThat(results, is(Arrays.asList(
- KeyValue.pair("A", 1L),
- KeyValue.pair("A", 2L),
- KeyValue.pair("B", 1L),
- KeyValue.pair("B", 2L),
- KeyValue.pair("C", 1L),
- KeyValue.pair("C", 2L),
- KeyValue.pair("D", 1L),
- KeyValue.pair("D", 2L),
- KeyValue.pair("E", 1L),
- KeyValue.pair("E", 2L)
- )));
- }
-
- @Test
- public void shouldGroupByKey() throws Exception {
- long timestamp = System.currentTimeMillis();
- produceMessages(timestamp);
- produceMessages(timestamp);
-
- stream.groupByKey(Serdes.Integer(), Serdes.String())
- .count(TimeWindows.of("count-windows", 500L))
- .toStream(new KeyValueMapper<Windowed<Integer>, Long, String>() {
- @Override
- public String apply(final Windowed<Integer> windowedKey, final Long value) {
- return windowedKey.key() + "@" + windowedKey.window().start();
- }
- }).to(Serdes.String(), Serdes.Long(), outputTopic);
-
- startStreams();
-
- List<KeyValue<String, Long>> results = receiveMessages(
- new StringDeserializer(),
- new LongDeserializer()
- , 10);
- Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
- @Override
- public int compare(KeyValue<String, Long> o1, KeyValue<String, Long> o2) {
- return KGroupedStreamIntegrationTest.compare(o1, o2);
- }
- });
-
- long window = timestamp / 500 * 500;
- assertThat(results, is(Arrays.asList(
- KeyValue.pair("1@" + window, 1L),
- KeyValue.pair("1@" + window, 2L),
- KeyValue.pair("2@" + window, 1L),
- KeyValue.pair("2@" + window, 2L),
- KeyValue.pair("3@" + window, 1L),
- KeyValue.pair("3@" + window, 2L),
- KeyValue.pair("4@" + window, 1L),
- KeyValue.pair("4@" + window, 2L),
- KeyValue.pair("5@" + window, 1L),
- KeyValue.pair("5@" + window, 2L)
- )));
-
- }
-
-
- private void produceMessages(long timestamp)
- throws ExecutionException, InterruptedException {
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
- streamOneInput,
- Arrays.asList(
- new KeyValue<>(1, "A"),
- new KeyValue<>(2, "B"),
- new KeyValue<>(3, "C"),
- new KeyValue<>(4, "D"),
- new KeyValue<>(5, "E")),
- TestUtils.producerConfig(
- CLUSTER.bootstrapServers(),
- IntegerSerializer.class,
- StringSerializer.class,
- new Properties()),
- timestamp);
- }
-
-
- private void createTopics() {
- streamOneInput = "stream-one-" + testNo;
- outputTopic = "output-" + testNo;
- CLUSTER.createTopic(streamOneInput, 3, 1);
- CLUSTER.createTopic(outputTopic);
- }
-
- private void startStreams() {
- kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
- kafkaStreams.start();
- }
-
-
- private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
- keyDeserializer,
- final Deserializer<V>
- valueDeserializer,
- final int numMessages)
- throws InterruptedException {
- final Properties consumerProperties = new Properties();
- consumerProperties
- .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
- consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" +
- testNo);
- consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
- keyDeserializer.getClass().getName());
- consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
- valueDeserializer.getClass().getName());
- return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerProperties,
- outputTopic,
- numMessages, 60 * 1000);
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/136a8fab/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
new file mode 100644
index 0000000..b91a907
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -0,0 +1,466 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at <p> http://www.apache.org/licenses/LICENSE-2.0 <p> Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.test.MockKeyValueMapper;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+public class KStreamAggregationIntegrationTest {
+
+ @ClassRule
+ public static final EmbeddedSingleNodeKafkaCluster CLUSTER =
+ new EmbeddedSingleNodeKafkaCluster();
+ private static volatile int testNo = 0;
+ private KStreamBuilder builder;
+ private Properties streamsConfiguration;
+ private KafkaStreams kafkaStreams;
+ private String streamOneInput;
+ private String outputTopic;
+ private KGroupedStream<String, String> groupedStream;
+ private Reducer<String> reducer;
+ private Initializer<Integer> initializer;
+ private Aggregator<String, String, Integer> aggregator;
+ private KStream<Integer, String> stream;
+
+
+ @Before
+ public void before() {
+ testNo++;
+ builder = new KStreamBuilder();
+ createTopics();
+ streamsConfiguration = new Properties();
+ String applicationId = "kgrouped-stream-test-" +
+ testNo;
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+ streamsConfiguration
+ .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
+ streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+
+ KeyValueMapper<Integer, String, String> mapper = MockKeyValueMapper.<Integer, String>SelectValueMapper();
+ stream = builder.stream(Serdes.Integer(), Serdes.String(), streamOneInput);
+ groupedStream = stream
+ .groupBy(
+ mapper,
+ Serdes.String(),
+ Serdes.String());
+
+ reducer = new Reducer<String>() {
+ @Override
+ public String apply(String value1, String value2) {
+ return value1 + ":" + value2;
+ }
+ };
+ initializer = new Initializer<Integer>() {
+ @Override
+ public Integer apply() {
+ return 0;
+ }
+ };
+ aggregator = new Aggregator<String, String, Integer>() {
+ @Override
+ public Integer apply(String aggKey, String value, Integer aggregate) {
+ return aggregate + value.length();
+ }
+ };
+ }
+
+ @After
+ public void whenShuttingDown() throws IOException {
+ if (kafkaStreams != null) {
+ kafkaStreams.close();
+ }
+ IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+ }
+
+
+ @Test
+ public void shouldReduce() throws Exception {
+ produceMessages(System.currentTimeMillis());
+ groupedStream
+ .reduce(reducer, "reduce-by-key")
+ .to(Serdes.String(), Serdes.String(), outputTopic);
+
+ startStreams();
+
+ produceMessages(System.currentTimeMillis());
+
+ List<KeyValue<String, String>> results = receiveMessages(
+ new StringDeserializer(),
+ new StringDeserializer()
+ , 10);
+
+ Collections.sort(results, new Comparator<KeyValue<String, String>>() {
+ @Override
+ public int compare(KeyValue<String, String> o1, KeyValue<String, String> o2) {
+ return KStreamAggregationIntegrationTest.compare(o1, o2);
+ }
+ });
+
+ assertThat(results, is(Arrays.asList(KeyValue.pair("A", "A"),
+ KeyValue.pair("A", "A:A"),
+ KeyValue.pair("B", "B"),
+ KeyValue.pair("B", "B:B"),
+ KeyValue.pair("C", "C"),
+ KeyValue.pair("C", "C:C"),
+ KeyValue.pair("D", "D"),
+ KeyValue.pair("D", "D:D"),
+ KeyValue.pair("E", "E"),
+ KeyValue.pair("E", "E:E"))));
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <K extends Comparable, V extends Comparable> int compare(final KeyValue<K, V> o1,
+ final KeyValue<K, V> o2) {
+ final int keyComparison = o1.key.compareTo(o2.key);
+ if (keyComparison == 0) {
+ return o1.value.compareTo(o2.value);
+ }
+ return keyComparison;
+ }
+
+ @Test
+ public void shouldReduceWindowed() throws Exception {
+ long firstBatchTimestamp = System.currentTimeMillis() - 1000;
+ produceMessages(firstBatchTimestamp);
+ long secondBatchTimestamp = System.currentTimeMillis();
+ produceMessages(secondBatchTimestamp);
+ produceMessages(secondBatchTimestamp);
+
+ groupedStream
+ .reduce(reducer, TimeWindows.of("reduce-time-windows", 500L))
+ .toStream(new KeyValueMapper<Windowed<String>, String, String>() {
+ @Override
+ public String apply(Windowed<String> windowedKey, String value) {
+ return windowedKey.key() + "@" + windowedKey.window().start();
+ }
+ })
+ .to(Serdes.String(), Serdes.String(), outputTopic);
+
+ startStreams();
+
+ List<KeyValue<String, String>> windowedOutput = receiveMessages(
+ new StringDeserializer(),
+ new StringDeserializer()
+ , 15);
+
+ Comparator<KeyValue<String, String>>
+ comparator =
+ new Comparator<KeyValue<String, String>>() {
+ @Override
+ public int compare(final KeyValue<String, String> o1,
+ final KeyValue<String, String> o2) {
+ return KStreamAggregationIntegrationTest.compare(o1, o2);
+ }
+ };
+
+ Collections.sort(windowedOutput, comparator);
+ long firstBatchWindow = firstBatchTimestamp / 500 * 500;
+ long secondBatchWindow = secondBatchTimestamp / 500 * 500;
+
+ assertThat(windowedOutput, is(
+ Arrays.asList(
+ new KeyValue<>("A@" + firstBatchWindow, "A"),
+ new KeyValue<>("A@" + secondBatchWindow, "A"),
+ new KeyValue<>("A@" + secondBatchWindow, "A:A"),
+ new KeyValue<>("B@" + firstBatchWindow, "B"),
+ new KeyValue<>("B@" + secondBatchWindow, "B"),
+ new KeyValue<>("B@" + secondBatchWindow, "B:B"),
+ new KeyValue<>("C@" + firstBatchWindow, "C"),
+ new KeyValue<>("C@" + secondBatchWindow, "C"),
+ new KeyValue<>("C@" + secondBatchWindow, "C:C"),
+ new KeyValue<>("D@" + firstBatchWindow, "D"),
+ new KeyValue<>("D@" + secondBatchWindow, "D"),
+ new KeyValue<>("D@" + secondBatchWindow, "D:D"),
+ new KeyValue<>("E@" + firstBatchWindow, "E"),
+ new KeyValue<>("E@" + secondBatchWindow, "E"),
+ new KeyValue<>("E@" + secondBatchWindow, "E:E")
+ )
+ ));
+ }
+
+ @Test
+ public void shouldAggregate() throws Exception {
+ produceMessages(System.currentTimeMillis());
+ groupedStream.aggregate(
+ initializer,
+ aggregator,
+ Serdes.Integer(),
+ "aggregate-by-selected-key")
+ .to(Serdes.String(), Serdes.Integer(), outputTopic);
+
+ startStreams();
+
+ produceMessages(System.currentTimeMillis());
+
+ List<KeyValue<String, Integer>> results = receiveMessages(
+ new StringDeserializer(),
+ new IntegerDeserializer()
+ , 10);
+
+ Collections.sort(results, new Comparator<KeyValue<String, Integer>>() {
+ @Override
+ public int compare(KeyValue<String, Integer> o1, KeyValue<String, Integer> o2) {
+ return KStreamAggregationIntegrationTest.compare(o1, o2);
+ }
+ });
+
+ assertThat(results, is(Arrays.asList(
+ KeyValue.pair("A", 1),
+ KeyValue.pair("A", 2),
+ KeyValue.pair("B", 1),
+ KeyValue.pair("B", 2),
+ KeyValue.pair("C", 1),
+ KeyValue.pair("C", 2),
+ KeyValue.pair("D", 1),
+ KeyValue.pair("D", 2),
+ KeyValue.pair("E", 1),
+ KeyValue.pair("E", 2)
+ )));
+ }
+
+ @Test
+ public void shouldAggregateWindowed() throws Exception {
+ long firstTimestamp = System.currentTimeMillis() - 1000;
+ produceMessages(firstTimestamp);
+ long secondTimestamp = System.currentTimeMillis();
+ produceMessages(secondTimestamp);
+ produceMessages(secondTimestamp);
+
+ groupedStream.aggregate(
+ initializer,
+ aggregator,
+ TimeWindows.of("aggregate-by-key-windowed", 500L),
+ Serdes.Integer())
+ .toStream(new KeyValueMapper<Windowed<String>, Integer, String>() {
+ @Override
+ public String apply(Windowed<String> windowedKey, Integer value) {
+ return windowedKey.key() + "@" + windowedKey.window().start();
+ }
+ })
+ .to(Serdes.String(), Serdes.Integer(), outputTopic);
+
+ startStreams();
+
+ List<KeyValue<String, Integer>> windowedMessages = receiveMessages(
+ new StringDeserializer(),
+ new IntegerDeserializer()
+ , 15);
+
+ Comparator<KeyValue<String, Integer>>
+ comparator =
+ new Comparator<KeyValue<String, Integer>>() {
+ @Override
+ public int compare(final KeyValue<String, Integer> o1,
+ final KeyValue<String, Integer> o2) {
+ return KStreamAggregationIntegrationTest.compare(o1, o2);
+ }
+ };
+
+ Collections.sort(windowedMessages, comparator);
+
+ long firstWindow = firstTimestamp / 500 * 500;
+ long secondWindow = secondTimestamp / 500 * 500;
+
+ assertThat(windowedMessages, is(
+ Arrays.asList(
+ new KeyValue<>("A@" + firstWindow, 1),
+ new KeyValue<>("A@" + secondWindow, 1),
+ new KeyValue<>("A@" + secondWindow, 2),
+ new KeyValue<>("B@" + firstWindow, 1),
+ new KeyValue<>("B@" + secondWindow, 1),
+ new KeyValue<>("B@" + secondWindow, 2),
+ new KeyValue<>("C@" + firstWindow, 1),
+ new KeyValue<>("C@" + secondWindow, 1),
+ new KeyValue<>("C@" + secondWindow, 2),
+ new KeyValue<>("D@" + firstWindow, 1),
+ new KeyValue<>("D@" + secondWindow, 1),
+ new KeyValue<>("D@" + secondWindow, 2),
+ new KeyValue<>("E@" + firstWindow, 1),
+ new KeyValue<>("E@" + secondWindow, 1),
+ new KeyValue<>("E@" + secondWindow, 2)
+ )));
+ }
+
+ @Test
+ public void shouldCount() throws Exception {
+ produceMessages(System.currentTimeMillis());
+
+ groupedStream.count("count-by-key")
+ .to(Serdes.String(), Serdes.Long(), outputTopic);
+
+ startStreams();
+
+ produceMessages(System.currentTimeMillis());
+
+ List<KeyValue<String, Long>> results = receiveMessages(
+ new StringDeserializer(),
+ new LongDeserializer()
+ , 10);
+ Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
+ @Override
+ public int compare(KeyValue<String, Long> o1, KeyValue<String, Long> o2) {
+ return KStreamAggregationIntegrationTest.compare(o1, o2);
+ }
+ });
+
+ assertThat(results, is(Arrays.asList(
+ KeyValue.pair("A", 1L),
+ KeyValue.pair("A", 2L),
+ KeyValue.pair("B", 1L),
+ KeyValue.pair("B", 2L),
+ KeyValue.pair("C", 1L),
+ KeyValue.pair("C", 2L),
+ KeyValue.pair("D", 1L),
+ KeyValue.pair("D", 2L),
+ KeyValue.pair("E", 1L),
+ KeyValue.pair("E", 2L)
+ )));
+ }
+
+ @Test
+ public void shouldGroupByKey() throws Exception {
+ long timestamp = System.currentTimeMillis();
+ produceMessages(timestamp);
+ produceMessages(timestamp);
+
+ stream.groupByKey(Serdes.Integer(), Serdes.String())
+ .count(TimeWindows.of("count-windows", 500L))
+ .toStream(new KeyValueMapper<Windowed<Integer>, Long, String>() {
+ @Override
+ public String apply(final Windowed<Integer> windowedKey, final Long value) {
+ return windowedKey.key() + "@" + windowedKey.window().start();
+ }
+ }).to(Serdes.String(), Serdes.Long(), outputTopic);
+
+ startStreams();
+
+ List<KeyValue<String, Long>> results = receiveMessages(
+ new StringDeserializer(),
+ new LongDeserializer()
+ , 10);
+ Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
+ @Override
+ public int compare(KeyValue<String, Long> o1, KeyValue<String, Long> o2) {
+ return KStreamAggregationIntegrationTest.compare(o1, o2);
+ }
+ });
+
+ long window = timestamp / 500 * 500;
+ assertThat(results, is(Arrays.asList(
+ KeyValue.pair("1@" + window, 1L),
+ KeyValue.pair("1@" + window, 2L),
+ KeyValue.pair("2@" + window, 1L),
+ KeyValue.pair("2@" + window, 2L),
+ KeyValue.pair("3@" + window, 1L),
+ KeyValue.pair("3@" + window, 2L),
+ KeyValue.pair("4@" + window, 1L),
+ KeyValue.pair("4@" + window, 2L),
+ KeyValue.pair("5@" + window, 1L),
+ KeyValue.pair("5@" + window, 2L)
+ )));
+
+ }
+
+
+ private void produceMessages(long timestamp)
+ throws ExecutionException, InterruptedException {
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ streamOneInput,
+ Arrays.asList(
+ new KeyValue<>(1, "A"),
+ new KeyValue<>(2, "B"),
+ new KeyValue<>(3, "C"),
+ new KeyValue<>(4, "D"),
+ new KeyValue<>(5, "E")),
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerSerializer.class,
+ StringSerializer.class,
+ new Properties()),
+ timestamp);
+ }
+
+
+ private void createTopics() {
+ streamOneInput = "stream-one-" + testNo;
+ outputTopic = "output-" + testNo;
+ CLUSTER.createTopic(streamOneInput, 3, 1);
+ CLUSTER.createTopic(outputTopic);
+ }
+
+ private void startStreams() {
+ kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+ kafkaStreams.start();
+ }
+
+
+ private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
+ keyDeserializer,
+ final Deserializer<V>
+ valueDeserializer,
+ final int numMessages)
+ throws InterruptedException {
+ final Properties consumerProperties = new Properties();
+ consumerProperties
+ .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" +
+ testNo);
+ consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ keyDeserializer.getClass().getName());
+ consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ valueDeserializer.getClass().getName());
+ return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerProperties,
+ outputTopic,
+ numMessages, 60 * 1000);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/136a8fab/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
new file mode 100644
index 0000000..b7d4fc3
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.kafka.streams.integration;
+
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * End-to-end integration test that demonstrates how to perform a join between a KStream and a
+ * KTable (think: KStream.leftJoin(KTable)), i.e. an example of a stateful computation.
+ */
+public class KStreamKTableJoinIntegrationTest {
+ @ClassRule
+ public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
+ private static final String USER_CLICKS_TOPIC = "user-clicks";
+ private static final String USER_REGIONS_TOPIC = "user-regions";
+ private static final String OUTPUT_TOPIC = "output-topic";
+
+ @BeforeClass
+ public static void startKafkaCluster() throws Exception {
+ CLUSTER.createTopic(USER_CLICKS_TOPIC);
+ CLUSTER.createTopic(USER_REGIONS_TOPIC);
+ CLUSTER.createTopic(OUTPUT_TOPIC);
+ }
+
+ /**
+ * Tuple for a region and its associated number of clicks.
+ */
+ private static final class RegionWithClicks {
+
+ private final String region;
+ private final long clicks;
+
+ public RegionWithClicks(String region, long clicks) {
+ if (region == null || region.isEmpty()) {
+ throw new IllegalArgumentException("region must be set");
+ }
+ if (clicks < 0) {
+ throw new IllegalArgumentException("clicks must not be negative");
+ }
+ this.region = region;
+ this.clicks = clicks;
+ }
+
+ public String getRegion() {
+ return region;
+ }
+
+ public long getClicks() {
+ return clicks;
+ }
+
+ }
+
+ @Test
+ public void shouldCountClicksPerRegion() throws Exception {
+ // Input 1: Clicks per user (multiple records allowed per user).
+ List<KeyValue<String, Long>> userClicks = Arrays.asList(
+ new KeyValue<>("alice", 13L),
+ new KeyValue<>("bob", 4L),
+ new KeyValue<>("chao", 25L),
+ new KeyValue<>("bob", 19L),
+ new KeyValue<>("dave", 56L),
+ new KeyValue<>("eve", 78L),
+ new KeyValue<>("alice", 40L),
+ new KeyValue<>("fang", 99L)
+ );
+
+ // Input 2: Region per user (multiple records allowed per user).
+ List<KeyValue<String, String>> userRegions = Arrays.asList(
+ new KeyValue<>("alice", "asia"), /* Alice lived in Asia originally... */
+ new KeyValue<>("bob", "americas"),
+ new KeyValue<>("chao", "asia"),
+ new KeyValue<>("dave", "europe"),
+ new KeyValue<>("alice", "europe"), /* ...but moved to Europe some time later. */
+ new KeyValue<>("eve", "americas"),
+ new KeyValue<>("fang", "asia")
+ );
+
+ List<KeyValue<String, Long>> expectedClicksPerRegion = Arrays.asList(
+ new KeyValue<>("europe", 13L),
+ new KeyValue<>("americas", 4L),
+ new KeyValue<>("asia", 25L),
+ new KeyValue<>("americas", 23L),
+ new KeyValue<>("europe", 69L),
+ new KeyValue<>("americas", 101L),
+ new KeyValue<>("europe", 109L),
+ new KeyValue<>("asia", 124L)
+ );
+
+ //
+ // Step 1: Configure and start the processor topology.
+ //
+ final Serde<String> stringSerde = Serdes.String();
+ final Serde<Long> longSerde = Serdes.Long();
+
+ Properties streamsConfiguration = new Properties();
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "join-integration-test");
+ streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
+ streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
+ TestUtils.tempDirectory().getPath());
+
+ // Remove any state from previous test runs
+ IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+
+ KStreamBuilder builder = new KStreamBuilder();
+
+ // This KStream contains information such as "alice" -> 13L.
+ //
+ // Because this is a KStream ("record stream"), multiple records for the same user will be
+ // considered as separate click-count events, each of which will be added to the total count.
+ KStream<String, Long> userClicksStream = builder.stream(stringSerde, longSerde, USER_CLICKS_TOPIC);
+
+ // This KTable contains information such as "alice" -> "europe".
+ //
+ // Because this is a KTable ("changelog stream"), only the latest value (here: region) for a
+ // record key will be considered at the time when a new user-click record (see above) is
+ // received for the `leftJoin` below. Any previous region values are being considered out of
+ // date. This behavior is quite different to the KStream for user clicks above.
+ //
+ // For example, the user "alice" will be considered to live in "europe" (although originally she
+ // lived in "asia") because, at the time her first user-click record is being received and
+ // subsequently processed in the `leftJoin`, the latest region update for "alice" is "europe"
+ // (which overrides her previous region value of "asia").
+ KTable<String, String> userRegionsTable =
+ builder.table(stringSerde, stringSerde, USER_REGIONS_TOPIC);
+
+ // Compute the number of clicks per region, e.g. "europe" -> 13L.
+ //
+ // The resulting KTable is continuously being updated as new data records are arriving in the
+ // input KStream `userClicksStream` and input KTable `userRegionsTable`.
+ KTable<String, Long> clicksPerRegion = userClicksStream
+ // Join the stream against the table.
+ //
+ // Null values possible: In general, null values are possible for region (i.e. the value of
+ // the KTable we are joining against) so we must guard against that (here: by setting the
+ // fallback region "UNKNOWN"). In this specific example this is not really needed because
+ // we know, based on the test setup, that all users have appropriate region entries at the
+ // time we perform the join.
+ //
+ // Also, we need to return a tuple of (region, clicks) for each user. But because Java does
+ // not support tuples out-of-the-box, we must use a custom class `RegionWithClicks` to
+ // achieve the same effect.
+ .leftJoin(userRegionsTable, new ValueJoiner<Long, String, RegionWithClicks>() {
+ @Override
+ public RegionWithClicks apply(Long clicks, String region) {
+ return new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks);
+ }
+ })
+ // Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
+ .map(new KeyValueMapper<String, RegionWithClicks, KeyValue<String, Long>>() {
+ @Override
+ public KeyValue<String, Long> apply(String key, RegionWithClicks value) {
+ return new KeyValue<>(value.getRegion(), value.getClicks());
+ }
+ })
+ // Compute the total per region by summing the individual click counts per region.
+ .groupByKey(stringSerde, longSerde)
+ .reduce(new Reducer<Long>() {
+ @Override
+ public Long apply(Long value1, Long value2) {
+ return value1 + value2;
+ }
+ }, "ClicksPerRegionUnwindowed");
+
+ // Write the (continuously updating) results to the output topic.
+ clicksPerRegion.to(stringSerde, longSerde, OUTPUT_TOPIC);
+
+ KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+ streams.start();
+
+ //
+ // Step 2: Publish user-region information.
+ //
+ // To keep this code example simple and easier to understand/reason about, we publish all
+ // user-region records before any user-click records (cf. step 3). In practice though,
+ // data records would typically be arriving concurrently in both input streams/topics.
+ Properties userRegionsProducerConfig = new Properties();
+ userRegionsProducerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ userRegionsProducerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+ userRegionsProducerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
+ userRegionsProducerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ userRegionsProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ IntegrationTestUtils.produceKeyValuesSynchronously(USER_REGIONS_TOPIC, userRegions, userRegionsProducerConfig);
+
+ //
+ // Step 3: Publish some user click events.
+ //
+ Properties userClicksProducerConfig = new Properties();
+ userClicksProducerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ userClicksProducerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+ userClicksProducerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
+ userClicksProducerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ userClicksProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+ IntegrationTestUtils.produceKeyValuesSynchronously(USER_CLICKS_TOPIC, userClicks, userClicksProducerConfig);
+
+ //
+ // Step 4: Verify the application's output data.
+ //
+ Properties consumerConfig = new Properties();
+ consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "join-integration-test-standard-consumer");
+ consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+ List<KeyValue<String, Long>> actualClicksPerRegion = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig,
+ OUTPUT_TOPIC, expectedClicksPerRegion.size());
+ streams.close();
+ assertThat(actualClicksPerRegion, equalTo(expectedClicksPerRegion));
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/136a8fab/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
index 9aaafe6..434216e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
@@ -59,8 +60,7 @@ public class KStreamRepartitionJoinTest {
private KStream<Integer, String> streamTwo;
private KStream<Integer, String> streamFour;
private ValueJoiner<Integer, String, String> valueJoiner;
- private KeyValueMapper<Long, Integer, KeyValue<Integer, Integer>>
- keyMapper;
+ private KeyValueMapper<Long, Integer, KeyValue<Integer, Integer>> keyMapper;
private final List<String>
expectedStreamOneTwoJoin = Arrays.asList("1:A", "2:B", "3:C", "4:D", "5:E");
@@ -77,16 +77,13 @@ public class KStreamRepartitionJoinTest {
builder = new KStreamBuilder();
createTopics();
streamsConfiguration = new Properties();
- streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
- applicationId);
- streamsConfiguration
- .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+ streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
-
streamOne = builder.stream(Serdes.Long(), Serdes.Integer(), streamOneInput);
streamTwo = builder.stream(Serdes.Integer(), Serdes.String(), streamTwoInput);
streamFour = builder.stream(Serdes.Integer(), Serdes.String(), streamFourInput);
@@ -98,12 +95,7 @@ public class KStreamRepartitionJoinTest {
}
};
- keyMapper = new KeyValueMapper<Long, Integer, KeyValue<Integer, Integer>>() {
- @Override
- public KeyValue<Integer, Integer> apply(final Long key, final Integer value) {
- return new KeyValue<>(value, value);
- }
- };
+ keyMapper = MockKeyValueMapper.<Long, Integer>SelectValueKeyValueMapper();
}
@After
@@ -146,19 +138,8 @@ public class KStreamRepartitionJoinTest {
}
private ExpectedOutputOnTopic mapBothStreamsAndJoin() throws Exception {
-
- final KStream<Integer, Integer>
- map1 =
- streamOne.map(keyMapper);
-
- final KStream<Integer, String> map2 = streamTwo.map(
- new KeyValueMapper<Integer, String, KeyValue<Integer, String>>() {
- @Override
- public KeyValue<Integer, String> apply(Integer key,
- String value) {
- return new KeyValue<>(key, value);
- }
- });
+ final KStream<Integer, Integer> map1 = streamOne.map(keyMapper);
+ final KStream<Integer, String> map2 = streamTwo.map(MockKeyValueMapper.<Integer, String>NoOpKeyValueMapper());
doJoin(map1, map2, "map-both-streams-and-join", "map-both-join");
return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, "map-both-streams-and-join");
@@ -185,14 +166,8 @@ public class KStreamRepartitionJoinTest {
public ExpectedOutputOnTopic selectKeyAndJoin() throws ExecutionException, InterruptedException {
- final KStream<Integer, Integer>
- keySelected =
- streamOne.selectKey(new KeyValueMapper<Long, Integer, Integer>() {
- @Override
- public Integer apply(final Long key, final Integer value) {
- return value;
- }
- });
+ final KStream<Integer, Integer> keySelected =
+ streamOne.selectKey(MockKeyValueMapper.<Long, Integer>SelectValueMapper());
String outputTopic = "select-key-join";
doJoin(keySelected, streamTwo, outputTopic, outputTopic);
@@ -239,18 +214,9 @@ public class KStreamRepartitionJoinTest {
}
public ExpectedOutputOnTopic mapBothStreamsAndLeftJoin() throws Exception {
- final KStream<Integer, Integer>
- map1 =
- streamOne.map(keyMapper);
+ final KStream<Integer, Integer> map1 = streamOne.map(keyMapper);
- final KStream<Integer, String> map2 = streamTwo.map(
- new KeyValueMapper<Integer, String, KeyValue<Integer, String>>() {
- @Override
- public KeyValue<Integer, String> apply(Integer key,
- String value) {
- return new KeyValue<>(key, value);
- }
- });
+ final KStream<Integer, String> map2 = streamTwo.map(MockKeyValueMapper.<Integer, String>NoOpKeyValueMapper());
String outputTopic = "left-join";
map1.leftJoin(map2,
@@ -266,19 +232,10 @@ public class KStreamRepartitionJoinTest {
private ExpectedOutputOnTopic joinTwoMappedStreamsOneThatHasBeenPreviouslyJoined() throws
Exception {
- final KStream<Integer, Integer>
- map1 =
- streamOne.map(keyMapper);
+ final KStream<Integer, Integer> map1 = streamOne.map(keyMapper);
final KeyValueMapper<Integer, String, KeyValue<Integer, String>>
- kvMapper =
- new KeyValueMapper<Integer, String, KeyValue<Integer, String>>() {
- @Override
- public KeyValue<Integer, String> apply(Integer key,
- String value) {
- return new KeyValue<>(key, value);
- }
- };
+ kvMapper = MockKeyValueMapper.<Integer, String>NoOpKeyValueMapper();
final KStream<Integer, String> map2 = streamTwo.map(kvMapper);
http://git-wip-us.apache.org/repos/asf/kafka/blob/136a8fab/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java
deleted file mode 100644
index 2096d9b..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-
-package org.apache.kafka.streams.integration;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.ValueMapper;
-
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Locale;
-import java.util.Properties;
-
-import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.junit.Assert.assertThat;
-
-/**
- * End-to-end integration test based on a simple map, using an embedded Kafka cluster.
- */
-public class MapFunctionIntegrationTest {
- @ClassRule
- public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
- private static final String DEFAULT_INPUT_TOPIC = "inputTopic";
- private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
-
- @BeforeClass
- public static void startKafkaCluster() throws Exception {
- CLUSTER.createTopic(DEFAULT_INPUT_TOPIC);
- CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC);
- }
-
- @Test
- public void shouldUppercaseTheInput() throws Exception {
- List<String> inputValues = Arrays.asList("hello", "world");
- List<String> expectedValues = new ArrayList<>();
- for (String input : inputValues) {
- expectedValues.add(input.toUpperCase(Locale.getDefault()));
- }
-
- //
- // Step 1: Configure and start the processor topology.
- //
- KStreamBuilder builder = new KStreamBuilder();
-
- Properties streamsConfiguration = new Properties();
- streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "map-function-integration-test");
- streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
- streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
- streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
- streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
- KStream<byte[], String> input = builder.stream(DEFAULT_INPUT_TOPIC);
- KStream<byte[], String> uppercased = input.mapValues(new ValueMapper<String, String>() {
- @Override
- public String apply(String value) {
- return value.toUpperCase(Locale.getDefault());
- }
- });
- uppercased.to(DEFAULT_OUTPUT_TOPIC);
-
- KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
- streams.start();
-
- //
- // Step 2: Produce some input data to the input topic.
- //
- Properties producerConfig = new Properties();
- producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
- producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
- producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
- producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
- producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerConfig);
-
- //
- // Step 3: Verify the application's output data.
- //
- Properties consumerConfig = new Properties();
- consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
- consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "map-function-integration-test-standard-consumer");
- consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
- consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- List<String> actualValues = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfig,
- DEFAULT_OUTPUT_TOPIC, inputValues.size());
- streams.close();
- assertThat(actualValues, equalTo(expectedValues));
- }
-
-}
\ No newline at end of file