You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/07/11 20:57:07 UTC

[2/2] kafka git commit: KAFKA-3887 Follow-up: add unit test for null checking in KTable aggregates

KAFKA-3887 Follow-up: add unit test for null checking in KTable aggregates

Also made a pass over the streams unit tests, with the following changes:

1. Removed three integration tests as they are already covered by other integration tests.
2. Merged `KGroupedTableImplTest` into `KTableAggregateTest`.
3. Use mocks whenever possible to reduce code duplicates.

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Damian Guy <da...@gmail.com>, Ismael Juma <is...@juma.me.uk>

Closes #1604 from guozhangwang/Kminor-unit-tests-consolidation


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/136a8fab
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/136a8fab
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/136a8fab

Branch: refs/heads/trunk
Commit: 136a8fabca8e266f67897cf5471b2e41c0a341be
Parents: c439268
Author: Guozhang Wang <wa...@gmail.com>
Authored: Mon Jul 11 13:57:02 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Jul 11 13:57:02 2016 -0700

----------------------------------------------------------------------
 .../InternalTopicIntegrationTest.java           |  14 +-
 .../integration/JoinIntegrationTest.java        | 259 ----------
 .../KGroupedStreamIntegrationTest.java          | 472 -------------------
 .../KStreamAggregationIntegrationTest.java      | 466 ++++++++++++++++++
 .../KStreamKTableJoinIntegrationTest.java       | 258 ++++++++++
 .../integration/KStreamRepartitionJoinTest.java |  69 +--
 .../integration/MapFunctionIntegrationTest.java | 122 -----
 .../integration/PassThroughIntegrationTest.java | 108 -----
 .../integration/WordCountIntegrationTest.java   | 154 ------
 .../internals/KGroupedTableImplTest.java        |  79 ----
 .../kstream/internals/KTableAggregateTest.java  |  57 ++-
 .../kstream/internals/KTableSourceTest.java     |   4 +-
 .../apache/kafka/test/MockKeyValueMapper.java   |  23 +-
 13 files changed, 808 insertions(+), 1277 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/136a8fab/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 15469c7..968e060 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -24,13 +24,15 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.TestUtils;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -41,8 +43,6 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Properties;
 
-import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import kafka.admin.AdminUtils;
 import kafka.log.LogConfig;
 import kafka.utils.ZKStringSerializer$;
@@ -135,12 +135,8 @@ public class InternalTopicIntegrationTest {
                 public Iterable<String> apply(String value) {
                     return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
                 }
-            }).groupBy(new KeyValueMapper<String, String, String>() {
-                @Override
-                public String apply(String key, String value) {
-                    return value;
-                }
-            }).count("Counts").toStream();
+            }).groupBy(MockKeyValueMapper.<String, String>SelectValueMapper())
+                .count("Counts").toStream();
 
         wordCounts.to(stringSerde, longSerde, DEFAULT_OUTPUT_TOPIC);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/136a8fab/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
deleted file mode 100644
index f99a142..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements.  See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.  You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.kafka.streams.integration;
-
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.Reducer;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.test.TestUtils;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.junit.Assert.assertThat;
-
-/**
- * End-to-end integration test that demonstrates how to perform a join between a KStream and a
- * KTable (think: KStream.leftJoin(KTable)), i.e. an example of a stateful computation.
- */
-public class JoinIntegrationTest {
-    @ClassRule
-    public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
-    private static final String USER_CLICKS_TOPIC = "user-clicks";
-    private static final String USER_REGIONS_TOPIC = "user-regions";
-    private static final String OUTPUT_TOPIC = "output-topic";
-
-    @BeforeClass
-    public static void startKafkaCluster() throws Exception {
-        CLUSTER.createTopic(USER_CLICKS_TOPIC);
-        CLUSTER.createTopic(USER_REGIONS_TOPIC);
-        CLUSTER.createTopic(OUTPUT_TOPIC);
-    }
-
-    /**
-     * Tuple for a region and its associated number of clicks.
-     */
-    private static final class RegionWithClicks {
-
-        private final String region;
-        private final long clicks;
-
-        public RegionWithClicks(String region, long clicks) {
-            if (region == null || region.isEmpty()) {
-                throw new IllegalArgumentException("region must be set");
-            }
-            if (clicks < 0) {
-                throw new IllegalArgumentException("clicks must not be negative");
-            }
-            this.region = region;
-            this.clicks = clicks;
-        }
-
-        public String getRegion() {
-            return region;
-        }
-
-        public long getClicks() {
-            return clicks;
-        }
-
-    }
-
-    @Test
-    public void shouldCountClicksPerRegion() throws Exception {
-        // Input 1: Clicks per user (multiple records allowed per user).
-        List<KeyValue<String, Long>> userClicks = Arrays.asList(
-            new KeyValue<>("alice", 13L),
-            new KeyValue<>("bob", 4L),
-            new KeyValue<>("chao", 25L),
-            new KeyValue<>("bob", 19L),
-            new KeyValue<>("dave", 56L),
-            new KeyValue<>("eve", 78L),
-            new KeyValue<>("alice", 40L),
-            new KeyValue<>("fang", 99L)
-        );
-
-        // Input 2: Region per user (multiple records allowed per user).
-        List<KeyValue<String, String>> userRegions = Arrays.asList(
-            new KeyValue<>("alice", "asia"),   /* Alice lived in Asia originally... */
-            new KeyValue<>("bob", "americas"),
-            new KeyValue<>("chao", "asia"),
-            new KeyValue<>("dave", "europe"),
-            new KeyValue<>("alice", "europe"), /* ...but moved to Europe some time later. */
-            new KeyValue<>("eve", "americas"),
-            new KeyValue<>("fang", "asia")
-        );
-
-        List<KeyValue<String, Long>> expectedClicksPerRegion = Arrays.asList(
-            new KeyValue<>("europe", 13L),
-            new KeyValue<>("americas", 4L),
-            new KeyValue<>("asia", 25L),
-            new KeyValue<>("americas", 23L),
-            new KeyValue<>("europe", 69L),
-            new KeyValue<>("americas", 101L),
-            new KeyValue<>("europe", 109L),
-            new KeyValue<>("asia", 124L)
-        );
-
-        //
-        // Step 1: Configure and start the processor topology.
-        //
-        final Serde<String> stringSerde = Serdes.String();
-        final Serde<Long> longSerde = Serdes.Long();
-
-        Properties streamsConfiguration = new Properties();
-        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "join-integration-test");
-        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
-        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
-                                 TestUtils.tempDirectory().getPath());
-
-        // Remove any state from previous test runs
-        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
-
-        KStreamBuilder builder = new KStreamBuilder();
-
-        // This KStream contains information such as "alice" -> 13L.
-        //
-        // Because this is a KStream ("record stream"), multiple records for the same user will be
-        // considered as separate click-count events, each of which will be added to the total count.
-        KStream<String, Long> userClicksStream = builder.stream(stringSerde, longSerde, USER_CLICKS_TOPIC);
-
-        // This KTable contains information such as "alice" -> "europe".
-        //
-        // Because this is a KTable ("changelog stream"), only the latest value (here: region) for a
-        // record key will be considered at the time when a new user-click record (see above) is
-        // received for the `leftJoin` below.  Any previous region values are being considered out of
-        // date.  This behavior is quite different to the KStream for user clicks above.
-        //
-        // For example, the user "alice" will be considered to live in "europe" (although originally she
-        // lived in "asia") because, at the time her first user-click record is being received and
-        // subsequently processed in the `leftJoin`, the latest region update for "alice" is "europe"
-        // (which overrides her previous region value of "asia").
-        KTable<String, String> userRegionsTable =
-            builder.table(stringSerde, stringSerde, USER_REGIONS_TOPIC);
-
-        // Compute the number of clicks per region, e.g. "europe" -> 13L.
-        //
-        // The resulting KTable is continuously being updated as new data records are arriving in the
-        // input KStream `userClicksStream` and input KTable `userRegionsTable`.
-        KTable<String, Long> clicksPerRegion = userClicksStream
-            // Join the stream against the table.
-            //
-            // Null values possible: In general, null values are possible for region (i.e. the value of
-            // the KTable we are joining against) so we must guard against that (here: by setting the
-            // fallback region "UNKNOWN").  In this specific example this is not really needed because
-            // we know, based on the test setup, that all users have appropriate region entries at the
-            // time we perform the join.
-            //
-            // Also, we need to return a tuple of (region, clicks) for each user.  But because Java does
-            // not support tuples out-of-the-box, we must use a custom class `RegionWithClicks` to
-            // achieve the same effect.
-            .leftJoin(userRegionsTable, new ValueJoiner<Long, String, RegionWithClicks>() {
-                @Override
-                public RegionWithClicks apply(Long clicks, String region) {
-                    RegionWithClicks regionWithClicks = new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks);
-                    return regionWithClicks;
-                }
-            })
-            // Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
-            .map(new KeyValueMapper<String, RegionWithClicks, KeyValue<String, Long>>() {
-                @Override
-                public KeyValue<String, Long> apply(String key, RegionWithClicks value) {
-                    return new KeyValue<>(value.getRegion(), value.getClicks());
-                }
-            })
-            // Compute the total per region by summing the individual click counts per region.
-            .groupByKey(stringSerde, longSerde)
-            .reduce(new Reducer<Long>() {
-                @Override
-                public Long apply(Long value1, Long value2) {
-                    return value1 + value2;
-                }
-            }, "ClicksPerRegionUnwindowed");
-
-        // Write the (continuously updating) results to the output topic.
-        clicksPerRegion.to(stringSerde, longSerde, OUTPUT_TOPIC);
-
-        KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
-        streams.start();
-
-        //
-        // Step 2: Publish user-region information.
-        //
-        // To keep this code example simple and easier to understand/reason about, we publish all
-        // user-region records before any user-click records (cf. step 3).  In practice though,
-        // data records would typically be arriving concurrently in both input streams/topics.
-        Properties userRegionsProducerConfig = new Properties();
-        userRegionsProducerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        userRegionsProducerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
-        userRegionsProducerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
-        userRegionsProducerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        userRegionsProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        IntegrationTestUtils.produceKeyValuesSynchronously(USER_REGIONS_TOPIC, userRegions, userRegionsProducerConfig);
-
-        //
-        // Step 3: Publish some user click events.
-        //
-        Properties userClicksProducerConfig = new Properties();
-        userClicksProducerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        userClicksProducerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
-        userClicksProducerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
-        userClicksProducerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        userClicksProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
-        IntegrationTestUtils.produceKeyValuesSynchronously(USER_CLICKS_TOPIC, userClicks, userClicksProducerConfig);
-
-        //
-        // Step 4: Verify the application's output data.
-        //
-        Properties consumerConfig = new Properties();
-        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "join-integration-test-standard-consumer");
-        consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
-        List<KeyValue<String, Long>> actualClicksPerRegion = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig,
-            OUTPUT_TOPIC, expectedClicksPerRegion.size());
-        streams.close();
-        assertThat(actualClicksPerRegion, equalTo(expectedClicksPerRegion));
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/136a8fab/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java
deleted file mode 100644
index 36340b9..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java
+++ /dev/null
@@ -1,472 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements.  See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.  You may obtain a
- * copy of the License at <p> http://www.apache.org/licenses/LICENSE-2.0 <p> Unless required by
- * applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
- * the License for the specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.streams.integration;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.kstream.Initializer;
-import org.apache.kafka.streams.kstream.KGroupedStream;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.Reducer;
-import org.apache.kafka.streams.kstream.TimeWindows;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.Is.is;
-
-public class KGroupedStreamIntegrationTest {
-
-    @ClassRule
-    public static final EmbeddedSingleNodeKafkaCluster CLUSTER =
-        new EmbeddedSingleNodeKafkaCluster();
-    private static volatile int testNo = 0;
-    private KStreamBuilder builder;
-    private Properties streamsConfiguration;
-    private KafkaStreams kafkaStreams;
-    private String streamOneInput;
-    private String outputTopic;
-    private KGroupedStream<String, String> groupedStream;
-    private Reducer<String> reducer;
-    private Initializer<Integer> initializer;
-    private Aggregator<String, String, Integer> aggregator;
-    private KStream<Integer, String> stream;
-
-
-    @Before
-    public void before() {
-        testNo++;
-        builder = new KStreamBuilder();
-        createTopics();
-        streamsConfiguration = new Properties();
-        String applicationId = "kgrouped-stream-test-" +
-                       testNo;
-        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
-        streamsConfiguration
-            .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
-        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
-
-        KeyValueMapper<Integer, String, String>
-            mapper =
-            new KeyValueMapper<Integer, String, String>() {
-                @Override
-                public String apply(Integer key, String value) {
-                    return value;
-                }
-            };
-        stream = builder.stream(Serdes.Integer(), Serdes.String(), streamOneInput);
-        groupedStream = stream
-            .groupBy(
-                mapper,
-                Serdes.String(),
-                Serdes.String());
-
-        reducer = new Reducer<String>() {
-            @Override
-            public String apply(String value1, String value2) {
-                return value1 + ":" + value2;
-            }
-        };
-        initializer = new Initializer<Integer>() {
-            @Override
-            public Integer apply() {
-                return 0;
-            }
-        };
-        aggregator = new Aggregator<String, String, Integer>() {
-            @Override
-            public Integer apply(String aggKey, String value, Integer aggregate) {
-                return aggregate + value.length();
-            }
-        };
-    }
-
-    @After
-    public void whenShuttingDown() throws IOException {
-        if (kafkaStreams != null) {
-            kafkaStreams.close();
-        }
-        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
-    }
-
-
-    @Test
-    public void shouldReduce() throws Exception {
-        produceMessages(System.currentTimeMillis());
-        groupedStream
-            .reduce(reducer, "reduce-by-key")
-            .to(Serdes.String(), Serdes.String(), outputTopic);
-
-        startStreams();
-
-        produceMessages(System.currentTimeMillis());
-
-        List<KeyValue<String, String>> results = receiveMessages(
-            new StringDeserializer(),
-            new StringDeserializer()
-            , 10);
-
-        Collections.sort(results, new Comparator<KeyValue<String, String>>() {
-            @Override
-            public int compare(KeyValue<String, String> o1, KeyValue<String, String> o2) {
-                return KGroupedStreamIntegrationTest.compare(o1, o2);
-            }
-        });
-
-        assertThat(results, is(Arrays.asList(KeyValue.pair("A", "A"),
-                                             KeyValue.pair("A", "A:A"),
-                                             KeyValue.pair("B", "B"),
-                                             KeyValue.pair("B", "B:B"),
-                                             KeyValue.pair("C", "C"),
-                                             KeyValue.pair("C", "C:C"),
-                                             KeyValue.pair("D", "D"),
-                                             KeyValue.pair("D", "D:D"),
-                                             KeyValue.pair("E", "E"),
-                                             KeyValue.pair("E", "E:E"))));
-    }
-
-    @SuppressWarnings("unchecked")
-    private static <K extends Comparable, V extends Comparable> int compare(final KeyValue<K, V> o1,
-                                                                            final KeyValue<K, V> o2) {
-        final int keyComparison = o1.key.compareTo(o2.key);
-        if (keyComparison == 0) {
-            return o1.value.compareTo(o2.value);
-        }
-        return keyComparison;
-    }
-
-    @Test
-    public void shouldReduceWindowed() throws Exception {
-        long firstBatchTimestamp = System.currentTimeMillis() - 1000;
-        produceMessages(firstBatchTimestamp);
-        long secondBatchTimestamp = System.currentTimeMillis();
-        produceMessages(secondBatchTimestamp);
-        produceMessages(secondBatchTimestamp);
-
-        groupedStream
-            .reduce(reducer, TimeWindows.of("reduce-time-windows", 500L))
-            .toStream(new KeyValueMapper<Windowed<String>, String, String>() {
-                @Override
-                public String apply(Windowed<String> windowedKey, String value) {
-                    return windowedKey.key() + "@" + windowedKey.window().start();
-                }
-            })
-            .to(Serdes.String(), Serdes.String(), outputTopic);
-
-        startStreams();
-
-        List<KeyValue<String, String>> windowedOutput = receiveMessages(
-            new StringDeserializer(),
-            new StringDeserializer()
-            , 15);
-
-        Comparator<KeyValue<String, String>>
-            comparator =
-            new Comparator<KeyValue<String, String>>() {
-                @Override
-                public int compare(final KeyValue<String, String> o1,
-                                   final KeyValue<String, String> o2) {
-                    return KGroupedStreamIntegrationTest.compare(o1, o2);
-                }
-            };
-
-        Collections.sort(windowedOutput, comparator);
-        long firstBatchWindow = firstBatchTimestamp / 500 * 500;
-        long secondBatchWindow = secondBatchTimestamp / 500 * 500;
-
-        assertThat(windowedOutput, is(
-            Arrays.asList(
-                new KeyValue<>("A@" + firstBatchWindow, "A"),
-                new KeyValue<>("A@" + secondBatchWindow, "A"),
-                new KeyValue<>("A@" + secondBatchWindow, "A:A"),
-                new KeyValue<>("B@" + firstBatchWindow, "B"),
-                new KeyValue<>("B@" + secondBatchWindow, "B"),
-                new KeyValue<>("B@" + secondBatchWindow, "B:B"),
-                new KeyValue<>("C@" + firstBatchWindow, "C"),
-                new KeyValue<>("C@" + secondBatchWindow, "C"),
-                new KeyValue<>("C@" + secondBatchWindow, "C:C"),
-                new KeyValue<>("D@" + firstBatchWindow, "D"),
-                new KeyValue<>("D@" + secondBatchWindow, "D"),
-                new KeyValue<>("D@" + secondBatchWindow, "D:D"),
-                new KeyValue<>("E@" + firstBatchWindow, "E"),
-                new KeyValue<>("E@" + secondBatchWindow, "E"),
-                new KeyValue<>("E@" + secondBatchWindow, "E:E")
-            )
-        ));
-    }
-
-    @Test
-    public void shouldAggregate() throws Exception {
-        produceMessages(System.currentTimeMillis());
-        groupedStream.aggregate(
-            initializer,
-            aggregator,
-            Serdes.Integer(),
-            "aggregate-by-selected-key")
-            .to(Serdes.String(), Serdes.Integer(), outputTopic);
-
-        startStreams();
-
-        produceMessages(System.currentTimeMillis());
-
-        List<KeyValue<String, Integer>> results = receiveMessages(
-            new StringDeserializer(),
-            new IntegerDeserializer()
-            , 10);
-
-        Collections.sort(results, new Comparator<KeyValue<String, Integer>>() {
-            @Override
-            public int compare(KeyValue<String, Integer> o1, KeyValue<String, Integer> o2) {
-                return KGroupedStreamIntegrationTest.compare(o1, o2);
-            }
-        });
-
-        assertThat(results, is(Arrays.asList(
-            KeyValue.pair("A", 1),
-            KeyValue.pair("A", 2),
-            KeyValue.pair("B", 1),
-            KeyValue.pair("B", 2),
-            KeyValue.pair("C", 1),
-            KeyValue.pair("C", 2),
-            KeyValue.pair("D", 1),
-            KeyValue.pair("D", 2),
-            KeyValue.pair("E", 1),
-            KeyValue.pair("E", 2)
-        )));
-    }
-
-    @Test
-    public void shouldAggregateWindowed() throws Exception {
-        long firstTimestamp = System.currentTimeMillis() - 1000;
-        produceMessages(firstTimestamp);
-        long secondTimestamp = System.currentTimeMillis();
-        produceMessages(secondTimestamp);
-        produceMessages(secondTimestamp);
-
-        groupedStream.aggregate(
-            initializer,
-            aggregator,
-            TimeWindows.of("aggregate-by-key-windowed", 500L),
-            Serdes.Integer())
-            .toStream(new KeyValueMapper<Windowed<String>, Integer, String>() {
-                @Override
-                public String apply(Windowed<String> windowedKey, Integer value) {
-                    return windowedKey.key() + "@" + windowedKey.window().start();
-                }
-            })
-            .to(Serdes.String(), Serdes.Integer(), outputTopic);
-
-        startStreams();
-
-        List<KeyValue<String, Integer>> windowedMessages = receiveMessages(
-            new StringDeserializer(),
-            new IntegerDeserializer()
-            , 15);
-
-        Comparator<KeyValue<String, Integer>>
-            comparator =
-            new Comparator<KeyValue<String, Integer>>() {
-                @Override
-                public int compare(final KeyValue<String, Integer> o1,
-                                   final KeyValue<String, Integer> o2) {
-                    return KGroupedStreamIntegrationTest.compare(o1, o2);
-                }
-            };
-
-        Collections.sort(windowedMessages, comparator);
-
-        long firstWindow = firstTimestamp / 500 * 500;
-        long secondWindow = secondTimestamp / 500 * 500;
-
-        assertThat(windowedMessages, is(
-            Arrays.asList(
-                new KeyValue<>("A@" + firstWindow, 1),
-                new KeyValue<>("A@" + secondWindow, 1),
-                new KeyValue<>("A@" + secondWindow, 2),
-                new KeyValue<>("B@" + firstWindow, 1),
-                new KeyValue<>("B@" + secondWindow, 1),
-                new KeyValue<>("B@" + secondWindow, 2),
-                new KeyValue<>("C@" + firstWindow, 1),
-                new KeyValue<>("C@" + secondWindow, 1),
-                new KeyValue<>("C@" + secondWindow, 2),
-                new KeyValue<>("D@" + firstWindow, 1),
-                new KeyValue<>("D@" + secondWindow, 1),
-                new KeyValue<>("D@" + secondWindow, 2),
-                new KeyValue<>("E@" + firstWindow, 1),
-                new KeyValue<>("E@" + secondWindow, 1),
-                new KeyValue<>("E@" + secondWindow, 2)
-            )));
-    }
-
-    @Test
-    public void shouldCount() throws Exception {
-        produceMessages(System.currentTimeMillis());
-
-        groupedStream.count("count-by-key")
-            .to(Serdes.String(), Serdes.Long(), outputTopic);
-
-        startStreams();
-
-        produceMessages(System.currentTimeMillis());
-
-        List<KeyValue<String, Long>> results = receiveMessages(
-            new StringDeserializer(),
-            new LongDeserializer()
-            , 10);
-        Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
-            @Override
-            public int compare(KeyValue<String, Long> o1, KeyValue<String, Long> o2) {
-                return KGroupedStreamIntegrationTest.compare(o1, o2);
-            }
-        });
-
-        assertThat(results, is(Arrays.asList(
-            KeyValue.pair("A", 1L),
-            KeyValue.pair("A", 2L),
-            KeyValue.pair("B", 1L),
-            KeyValue.pair("B", 2L),
-            KeyValue.pair("C", 1L),
-            KeyValue.pair("C", 2L),
-            KeyValue.pair("D", 1L),
-            KeyValue.pair("D", 2L),
-            KeyValue.pair("E", 1L),
-            KeyValue.pair("E", 2L)
-        )));
-    }
-
-    @Test
-    public void shouldGroupByKey() throws Exception {
-        long timestamp = System.currentTimeMillis();
-        produceMessages(timestamp);
-        produceMessages(timestamp);
-
-        stream.groupByKey(Serdes.Integer(), Serdes.String())
-            .count(TimeWindows.of("count-windows", 500L))
-            .toStream(new KeyValueMapper<Windowed<Integer>, Long, String>() {
-                @Override
-                public String apply(final Windowed<Integer> windowedKey, final Long value) {
-                    return windowedKey.key() + "@" + windowedKey.window().start();
-                }
-            }).to(Serdes.String(), Serdes.Long(), outputTopic);
-
-        startStreams();
-
-        List<KeyValue<String, Long>> results = receiveMessages(
-            new StringDeserializer(),
-            new LongDeserializer()
-            , 10);
-        Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
-            @Override
-            public int compare(KeyValue<String, Long> o1, KeyValue<String, Long> o2) {
-                return KGroupedStreamIntegrationTest.compare(o1, o2);
-            }
-        });
-
-        long window = timestamp / 500 * 500;
-        assertThat(results, is(Arrays.asList(
-            KeyValue.pair("1@" + window, 1L),
-            KeyValue.pair("1@" + window, 2L),
-            KeyValue.pair("2@" + window, 1L),
-            KeyValue.pair("2@" + window, 2L),
-            KeyValue.pair("3@" + window, 1L),
-            KeyValue.pair("3@" + window, 2L),
-            KeyValue.pair("4@" + window, 1L),
-            KeyValue.pair("4@" + window, 2L),
-            KeyValue.pair("5@" + window, 1L),
-            KeyValue.pair("5@" + window, 2L)
-        )));
-
-    }
-
-
-    private void produceMessages(long timestamp)
-        throws ExecutionException, InterruptedException {
-        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
-            streamOneInput,
-            Arrays.asList(
-                new KeyValue<>(1, "A"),
-                new KeyValue<>(2, "B"),
-                new KeyValue<>(3, "C"),
-                new KeyValue<>(4, "D"),
-                new KeyValue<>(5, "E")),
-            TestUtils.producerConfig(
-                CLUSTER.bootstrapServers(),
-                IntegerSerializer.class,
-                StringSerializer.class,
-                new Properties()),
-            timestamp);
-    }
-
-
-    private void createTopics() {
-        streamOneInput = "stream-one-" + testNo;
-        outputTopic = "output-" + testNo;
-        CLUSTER.createTopic(streamOneInput, 3, 1);
-        CLUSTER.createTopic(outputTopic);
-    }
-
-    private void startStreams() {
-        kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
-        kafkaStreams.start();
-    }
-
-
-    private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
-                                                            keyDeserializer,
-                                                        final Deserializer<V>
-                                                            valueDeserializer,
-                                                        final int numMessages)
-        throws InterruptedException {
-        final Properties consumerProperties = new Properties();
-        consumerProperties
-            .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" +
-                                                                       testNo);
-        consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-                                       keyDeserializer.getClass().getName());
-        consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-                                       valueDeserializer.getClass().getName());
-        return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerProperties,
-                                                                        outputTopic,
-                                                                        numMessages, 60 * 1000);
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/136a8fab/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
new file mode 100644
index 0000000..b91a907
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -0,0 +1,466 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at <p> http://www.apache.org/licenses/LICENSE-2.0 <p> Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.test.MockKeyValueMapper;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+public class KStreamAggregationIntegrationTest {
+
+    @ClassRule
+    public static final EmbeddedSingleNodeKafkaCluster CLUSTER =
+        new EmbeddedSingleNodeKafkaCluster();
+    private static volatile int testNo = 0;
+    private KStreamBuilder builder;
+    private Properties streamsConfiguration;
+    private KafkaStreams kafkaStreams;
+    private String streamOneInput;
+    private String outputTopic;
+    private KGroupedStream<String, String> groupedStream;
+    private Reducer<String> reducer;
+    private Initializer<Integer> initializer;
+    private Aggregator<String, String, Integer> aggregator;
+    private KStream<Integer, String> stream;
+
+
+    @Before
+    public void before() {
+        testNo++;
+        builder = new KStreamBuilder();
+        createTopics();
+        streamsConfiguration = new Properties();
+        String applicationId = "kgrouped-stream-test-" +
+                       testNo;
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+        streamsConfiguration
+            .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+
+        KeyValueMapper<Integer, String, String> mapper = MockKeyValueMapper.<Integer, String>SelectValueMapper();
+        stream = builder.stream(Serdes.Integer(), Serdes.String(), streamOneInput);
+        groupedStream = stream
+            .groupBy(
+                mapper,
+                Serdes.String(),
+                Serdes.String());
+
+        reducer = new Reducer<String>() {
+            @Override
+            public String apply(String value1, String value2) {
+                return value1 + ":" + value2;
+            }
+        };
+        initializer = new Initializer<Integer>() {
+            @Override
+            public Integer apply() {
+                return 0;
+            }
+        };
+        aggregator = new Aggregator<String, String, Integer>() {
+            @Override
+            public Integer apply(String aggKey, String value, Integer aggregate) {
+                return aggregate + value.length();
+            }
+        };
+    }
+
+    @After
+    public void whenShuttingDown() throws IOException {
+        if (kafkaStreams != null) {
+            kafkaStreams.close();
+        }
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+    }
+
+
+    @Test
+    public void shouldReduce() throws Exception {
+        produceMessages(System.currentTimeMillis());
+        groupedStream
+            .reduce(reducer, "reduce-by-key")
+            .to(Serdes.String(), Serdes.String(), outputTopic);
+
+        startStreams();
+
+        produceMessages(System.currentTimeMillis());
+
+        List<KeyValue<String, String>> results = receiveMessages(
+            new StringDeserializer(),
+            new StringDeserializer()
+            , 10);
+
+        Collections.sort(results, new Comparator<KeyValue<String, String>>() {
+            @Override
+            public int compare(KeyValue<String, String> o1, KeyValue<String, String> o2) {
+                return KStreamAggregationIntegrationTest.compare(o1, o2);
+            }
+        });
+
+        assertThat(results, is(Arrays.asList(KeyValue.pair("A", "A"),
+                                             KeyValue.pair("A", "A:A"),
+                                             KeyValue.pair("B", "B"),
+                                             KeyValue.pair("B", "B:B"),
+                                             KeyValue.pair("C", "C"),
+                                             KeyValue.pair("C", "C:C"),
+                                             KeyValue.pair("D", "D"),
+                                             KeyValue.pair("D", "D:D"),
+                                             KeyValue.pair("E", "E"),
+                                             KeyValue.pair("E", "E:E"))));
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <K extends Comparable, V extends Comparable> int compare(final KeyValue<K, V> o1,
+                                                                            final KeyValue<K, V> o2) {
+        final int keyComparison = o1.key.compareTo(o2.key);
+        if (keyComparison == 0) {
+            return o1.value.compareTo(o2.value);
+        }
+        return keyComparison;
+    }
+
+    @Test
+    public void shouldReduceWindowed() throws Exception {
+        long firstBatchTimestamp = System.currentTimeMillis() - 1000;
+        produceMessages(firstBatchTimestamp);
+        long secondBatchTimestamp = System.currentTimeMillis();
+        produceMessages(secondBatchTimestamp);
+        produceMessages(secondBatchTimestamp);
+
+        groupedStream
+            .reduce(reducer, TimeWindows.of("reduce-time-windows", 500L))
+            .toStream(new KeyValueMapper<Windowed<String>, String, String>() {
+                @Override
+                public String apply(Windowed<String> windowedKey, String value) {
+                    return windowedKey.key() + "@" + windowedKey.window().start();
+                }
+            })
+            .to(Serdes.String(), Serdes.String(), outputTopic);
+
+        startStreams();
+
+        List<KeyValue<String, String>> windowedOutput = receiveMessages(
+            new StringDeserializer(),
+            new StringDeserializer()
+            , 15);
+
+        Comparator<KeyValue<String, String>>
+            comparator =
+            new Comparator<KeyValue<String, String>>() {
+                @Override
+                public int compare(final KeyValue<String, String> o1,
+                                   final KeyValue<String, String> o2) {
+                    return KStreamAggregationIntegrationTest.compare(o1, o2);
+                }
+            };
+
+        Collections.sort(windowedOutput, comparator);
+        long firstBatchWindow = firstBatchTimestamp / 500 * 500;
+        long secondBatchWindow = secondBatchTimestamp / 500 * 500;
+
+        assertThat(windowedOutput, is(
+            Arrays.asList(
+                new KeyValue<>("A@" + firstBatchWindow, "A"),
+                new KeyValue<>("A@" + secondBatchWindow, "A"),
+                new KeyValue<>("A@" + secondBatchWindow, "A:A"),
+                new KeyValue<>("B@" + firstBatchWindow, "B"),
+                new KeyValue<>("B@" + secondBatchWindow, "B"),
+                new KeyValue<>("B@" + secondBatchWindow, "B:B"),
+                new KeyValue<>("C@" + firstBatchWindow, "C"),
+                new KeyValue<>("C@" + secondBatchWindow, "C"),
+                new KeyValue<>("C@" + secondBatchWindow, "C:C"),
+                new KeyValue<>("D@" + firstBatchWindow, "D"),
+                new KeyValue<>("D@" + secondBatchWindow, "D"),
+                new KeyValue<>("D@" + secondBatchWindow, "D:D"),
+                new KeyValue<>("E@" + firstBatchWindow, "E"),
+                new KeyValue<>("E@" + secondBatchWindow, "E"),
+                new KeyValue<>("E@" + secondBatchWindow, "E:E")
+            )
+        ));
+    }
+
+    @Test
+    public void shouldAggregate() throws Exception {
+        produceMessages(System.currentTimeMillis());
+        groupedStream.aggregate(
+            initializer,
+            aggregator,
+            Serdes.Integer(),
+            "aggregate-by-selected-key")
+            .to(Serdes.String(), Serdes.Integer(), outputTopic);
+
+        startStreams();
+
+        produceMessages(System.currentTimeMillis());
+
+        List<KeyValue<String, Integer>> results = receiveMessages(
+            new StringDeserializer(),
+            new IntegerDeserializer()
+            , 10);
+
+        Collections.sort(results, new Comparator<KeyValue<String, Integer>>() {
+            @Override
+            public int compare(KeyValue<String, Integer> o1, KeyValue<String, Integer> o2) {
+                return KStreamAggregationIntegrationTest.compare(o1, o2);
+            }
+        });
+
+        assertThat(results, is(Arrays.asList(
+            KeyValue.pair("A", 1),
+            KeyValue.pair("A", 2),
+            KeyValue.pair("B", 1),
+            KeyValue.pair("B", 2),
+            KeyValue.pair("C", 1),
+            KeyValue.pair("C", 2),
+            KeyValue.pair("D", 1),
+            KeyValue.pair("D", 2),
+            KeyValue.pair("E", 1),
+            KeyValue.pair("E", 2)
+        )));
+    }
+
+    @Test
+    public void shouldAggregateWindowed() throws Exception {
+        long firstTimestamp = System.currentTimeMillis() - 1000;
+        produceMessages(firstTimestamp);
+        long secondTimestamp = System.currentTimeMillis();
+        produceMessages(secondTimestamp);
+        produceMessages(secondTimestamp);
+
+        groupedStream.aggregate(
+            initializer,
+            aggregator,
+            TimeWindows.of("aggregate-by-key-windowed", 500L),
+            Serdes.Integer())
+            .toStream(new KeyValueMapper<Windowed<String>, Integer, String>() {
+                @Override
+                public String apply(Windowed<String> windowedKey, Integer value) {
+                    return windowedKey.key() + "@" + windowedKey.window().start();
+                }
+            })
+            .to(Serdes.String(), Serdes.Integer(), outputTopic);
+
+        startStreams();
+
+        List<KeyValue<String, Integer>> windowedMessages = receiveMessages(
+            new StringDeserializer(),
+            new IntegerDeserializer()
+            , 15);
+
+        Comparator<KeyValue<String, Integer>>
+            comparator =
+            new Comparator<KeyValue<String, Integer>>() {
+                @Override
+                public int compare(final KeyValue<String, Integer> o1,
+                                   final KeyValue<String, Integer> o2) {
+                    return KStreamAggregationIntegrationTest.compare(o1, o2);
+                }
+            };
+
+        Collections.sort(windowedMessages, comparator);
+
+        long firstWindow = firstTimestamp / 500 * 500;
+        long secondWindow = secondTimestamp / 500 * 500;
+
+        assertThat(windowedMessages, is(
+            Arrays.asList(
+                new KeyValue<>("A@" + firstWindow, 1),
+                new KeyValue<>("A@" + secondWindow, 1),
+                new KeyValue<>("A@" + secondWindow, 2),
+                new KeyValue<>("B@" + firstWindow, 1),
+                new KeyValue<>("B@" + secondWindow, 1),
+                new KeyValue<>("B@" + secondWindow, 2),
+                new KeyValue<>("C@" + firstWindow, 1),
+                new KeyValue<>("C@" + secondWindow, 1),
+                new KeyValue<>("C@" + secondWindow, 2),
+                new KeyValue<>("D@" + firstWindow, 1),
+                new KeyValue<>("D@" + secondWindow, 1),
+                new KeyValue<>("D@" + secondWindow, 2),
+                new KeyValue<>("E@" + firstWindow, 1),
+                new KeyValue<>("E@" + secondWindow, 1),
+                new KeyValue<>("E@" + secondWindow, 2)
+            )));
+    }
+
+    @Test
+    public void shouldCount() throws Exception {
+        produceMessages(System.currentTimeMillis());
+
+        groupedStream.count("count-by-key")
+            .to(Serdes.String(), Serdes.Long(), outputTopic);
+
+        startStreams();
+
+        produceMessages(System.currentTimeMillis());
+
+        List<KeyValue<String, Long>> results = receiveMessages(
+            new StringDeserializer(),
+            new LongDeserializer()
+            , 10);
+        Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
+            @Override
+            public int compare(KeyValue<String, Long> o1, KeyValue<String, Long> o2) {
+                return KStreamAggregationIntegrationTest.compare(o1, o2);
+            }
+        });
+
+        assertThat(results, is(Arrays.asList(
+            KeyValue.pair("A", 1L),
+            KeyValue.pair("A", 2L),
+            KeyValue.pair("B", 1L),
+            KeyValue.pair("B", 2L),
+            KeyValue.pair("C", 1L),
+            KeyValue.pair("C", 2L),
+            KeyValue.pair("D", 1L),
+            KeyValue.pair("D", 2L),
+            KeyValue.pair("E", 1L),
+            KeyValue.pair("E", 2L)
+        )));
+    }
+
+    @Test
+    public void shouldGroupByKey() throws Exception {
+        long timestamp = System.currentTimeMillis();
+        produceMessages(timestamp);
+        produceMessages(timestamp);
+
+        stream.groupByKey(Serdes.Integer(), Serdes.String())
+            .count(TimeWindows.of("count-windows", 500L))
+            .toStream(new KeyValueMapper<Windowed<Integer>, Long, String>() {
+                @Override
+                public String apply(final Windowed<Integer> windowedKey, final Long value) {
+                    return windowedKey.key() + "@" + windowedKey.window().start();
+                }
+            }).to(Serdes.String(), Serdes.Long(), outputTopic);
+
+        startStreams();
+
+        List<KeyValue<String, Long>> results = receiveMessages(
+            new StringDeserializer(),
+            new LongDeserializer()
+            , 10);
+        Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
+            @Override
+            public int compare(KeyValue<String, Long> o1, KeyValue<String, Long> o2) {
+                return KStreamAggregationIntegrationTest.compare(o1, o2);
+            }
+        });
+
+        long window = timestamp / 500 * 500;
+        assertThat(results, is(Arrays.asList(
+            KeyValue.pair("1@" + window, 1L),
+            KeyValue.pair("1@" + window, 2L),
+            KeyValue.pair("2@" + window, 1L),
+            KeyValue.pair("2@" + window, 2L),
+            KeyValue.pair("3@" + window, 1L),
+            KeyValue.pair("3@" + window, 2L),
+            KeyValue.pair("4@" + window, 1L),
+            KeyValue.pair("4@" + window, 2L),
+            KeyValue.pair("5@" + window, 1L),
+            KeyValue.pair("5@" + window, 2L)
+        )));
+
+    }
+
+
+    private void produceMessages(long timestamp)
+        throws ExecutionException, InterruptedException {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            streamOneInput,
+            Arrays.asList(
+                new KeyValue<>(1, "A"),
+                new KeyValue<>(2, "B"),
+                new KeyValue<>(3, "C"),
+                new KeyValue<>(4, "D"),
+                new KeyValue<>(5, "E")),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                StringSerializer.class,
+                new Properties()),
+            timestamp);
+    }
+
+
+    private void createTopics() {
+        streamOneInput = "stream-one-" + testNo;
+        outputTopic = "output-" + testNo;
+        CLUSTER.createTopic(streamOneInput, 3, 1);
+        CLUSTER.createTopic(outputTopic);
+    }
+
+    private void startStreams() {
+        kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+        kafkaStreams.start();
+    }
+
+
+    private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
+                                                            keyDeserializer,
+                                                        final Deserializer<V>
+                                                            valueDeserializer,
+                                                        final int numMessages)
+        throws InterruptedException {
+        final Properties consumerProperties = new Properties();
+        consumerProperties
+            .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" +
+                                                                       testNo);
+        consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                                       keyDeserializer.getClass().getName());
+        consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                                       valueDeserializer.getClass().getName());
+        return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerProperties,
+                                                                        outputTopic,
+                                                                        numMessages, 60 * 1000);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/136a8fab/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
new file mode 100644
index 0000000..b7d4fc3
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.kafka.streams.integration;
+
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * End-to-end integration test that demonstrates how to perform a join between a KStream and a
+ * KTable (think: KStream.leftJoin(KTable)), i.e. an example of a stateful computation.
+ */
+public class KStreamKTableJoinIntegrationTest {
+    @ClassRule
+    public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
+    private static final String USER_CLICKS_TOPIC = "user-clicks";
+    private static final String USER_REGIONS_TOPIC = "user-regions";
+    private static final String OUTPUT_TOPIC = "output-topic";
+
+    @BeforeClass
+    public static void startKafkaCluster() throws Exception {
+        CLUSTER.createTopic(USER_CLICKS_TOPIC);
+        CLUSTER.createTopic(USER_REGIONS_TOPIC);
+        CLUSTER.createTopic(OUTPUT_TOPIC);
+    }
+
+    /**
+     * Tuple for a region and its associated number of clicks.
+     */
+    private static final class RegionWithClicks {
+
+        private final String region;
+        private final long clicks;
+
+        public RegionWithClicks(String region, long clicks) {
+            if (region == null || region.isEmpty()) {
+                throw new IllegalArgumentException("region must be set");
+            }
+            if (clicks < 0) {
+                throw new IllegalArgumentException("clicks must not be negative");
+            }
+            this.region = region;
+            this.clicks = clicks;
+        }
+
+        public String getRegion() {
+            return region;
+        }
+
+        public long getClicks() {
+            return clicks;
+        }
+
+    }
+
+    @Test
+    public void shouldCountClicksPerRegion() throws Exception {
+        // Input 1: Clicks per user (multiple records allowed per user).
+        List<KeyValue<String, Long>> userClicks = Arrays.asList(
+            new KeyValue<>("alice", 13L),
+            new KeyValue<>("bob", 4L),
+            new KeyValue<>("chao", 25L),
+            new KeyValue<>("bob", 19L),
+            new KeyValue<>("dave", 56L),
+            new KeyValue<>("eve", 78L),
+            new KeyValue<>("alice", 40L),
+            new KeyValue<>("fang", 99L)
+        );
+
+        // Input 2: Region per user (multiple records allowed per user).
+        List<KeyValue<String, String>> userRegions = Arrays.asList(
+            new KeyValue<>("alice", "asia"),   /* Alice lived in Asia originally... */
+            new KeyValue<>("bob", "americas"),
+            new KeyValue<>("chao", "asia"),
+            new KeyValue<>("dave", "europe"),
+            new KeyValue<>("alice", "europe"), /* ...but moved to Europe some time later. */
+            new KeyValue<>("eve", "americas"),
+            new KeyValue<>("fang", "asia")
+        );
+
+        List<KeyValue<String, Long>> expectedClicksPerRegion = Arrays.asList(
+            new KeyValue<>("europe", 13L),
+            new KeyValue<>("americas", 4L),
+            new KeyValue<>("asia", 25L),
+            new KeyValue<>("americas", 23L),
+            new KeyValue<>("europe", 69L),
+            new KeyValue<>("americas", 101L),
+            new KeyValue<>("europe", 109L),
+            new KeyValue<>("asia", 124L)
+        );
+
+        //
+        // Step 1: Configure and start the processor topology.
+        //
+        final Serde<String> stringSerde = Serdes.String();
+        final Serde<Long> longSerde = Serdes.Long();
+
+        Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "join-integration-test");
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
+        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
+                                 TestUtils.tempDirectory().getPath());
+
+        // Remove any state from previous test runs
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+
+        KStreamBuilder builder = new KStreamBuilder();
+
+        // This KStream contains information such as "alice" -> 13L.
+        //
+        // Because this is a KStream ("record stream"), multiple records for the same user will be
+        // considered as separate click-count events, each of which will be added to the total count.
+        KStream<String, Long> userClicksStream = builder.stream(stringSerde, longSerde, USER_CLICKS_TOPIC);
+
+        // This KTable contains information such as "alice" -> "europe".
+        //
+        // Because this is a KTable ("changelog stream"), only the latest value (here: region) for a
+        // record key will be considered at the time when a new user-click record (see above) is
+        // received for the `leftJoin` below.  Any previous region values are being considered out of
+        // date.  This behavior is quite different to the KStream for user clicks above.
+        //
+        // For example, the user "alice" will be considered to live in "europe" (although originally she
+        // lived in "asia") because, at the time her first user-click record is being received and
+        // subsequently processed in the `leftJoin`, the latest region update for "alice" is "europe"
+        // (which overrides her previous region value of "asia").
+        KTable<String, String> userRegionsTable =
+            builder.table(stringSerde, stringSerde, USER_REGIONS_TOPIC);
+
+        // Compute the number of clicks per region, e.g. "europe" -> 13L.
+        //
+        // The resulting KTable is continuously being updated as new data records are arriving in the
+        // input KStream `userClicksStream` and input KTable `userRegionsTable`.
+        KTable<String, Long> clicksPerRegion = userClicksStream
+            // Join the stream against the table.
+            //
+            // Null values possible: In general, null values are possible for region (i.e. the value of
+            // the KTable we are joining against) so we must guard against that (here: by setting the
+            // fallback region "UNKNOWN").  In this specific example this is not really needed because
+            // we know, based on the test setup, that all users have appropriate region entries at the
+            // time we perform the join.
+            //
+            // Also, we need to return a tuple of (region, clicks) for each user.  But because Java does
+            // not support tuples out-of-the-box, we must use a custom class `RegionWithClicks` to
+            // achieve the same effect.
+            .leftJoin(userRegionsTable, new ValueJoiner<Long, String, RegionWithClicks>() {
+                @Override
+                public RegionWithClicks apply(Long clicks, String region) {
+                    return new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks);
+                }
+            })
+            // Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
+            .map(new KeyValueMapper<String, RegionWithClicks, KeyValue<String, Long>>() {
+                @Override
+                public KeyValue<String, Long> apply(String key, RegionWithClicks value) {
+                    return new KeyValue<>(value.getRegion(), value.getClicks());
+                }
+            })
+            // Compute the total per region by summing the individual click counts per region.
+            .groupByKey(stringSerde, longSerde)
+            .reduce(new Reducer<Long>() {
+                @Override
+                public Long apply(Long value1, Long value2) {
+                    return value1 + value2;
+                }
+            }, "ClicksPerRegionUnwindowed");
+
+        // Write the (continuously updating) results to the output topic.
+        clicksPerRegion.to(stringSerde, longSerde, OUTPUT_TOPIC);
+
+        KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+        streams.start();
+
+        //
+        // Step 2: Publish user-region information.
+        //
+        // To keep this code example simple and easier to understand/reason about, we publish all
+        // user-region records before any user-click records (cf. step 3).  In practice though,
+        // data records would typically be arriving concurrently in both input streams/topics.
+        Properties userRegionsProducerConfig = new Properties();
+        userRegionsProducerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        userRegionsProducerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        userRegionsProducerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
+        userRegionsProducerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        userRegionsProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        IntegrationTestUtils.produceKeyValuesSynchronously(USER_REGIONS_TOPIC, userRegions, userRegionsProducerConfig);
+
+        //
+        // Step 3: Publish some user click events.
+        //
+        Properties userClicksProducerConfig = new Properties();
+        userClicksProducerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        userClicksProducerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        userClicksProducerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
+        userClicksProducerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        userClicksProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+        IntegrationTestUtils.produceKeyValuesSynchronously(USER_CLICKS_TOPIC, userClicks, userClicksProducerConfig);
+
+        //
+        // Step 4: Verify the application's output data.
+        //
+        Properties consumerConfig = new Properties();
+        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "join-integration-test-standard-consumer");
+        consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        List<KeyValue<String, Long>> actualClicksPerRegion = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig,
+            OUTPUT_TOPIC, expectedClicksPerRegion.size());
+        streams.close();
+        assertThat(actualClicksPerRegion, equalTo(expectedClicksPerRegion));
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/136a8fab/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
index 9aaafe6..434216e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -59,8 +60,7 @@ public class KStreamRepartitionJoinTest {
     private KStream<Integer, String> streamTwo;
     private KStream<Integer, String> streamFour;
     private ValueJoiner<Integer, String, String> valueJoiner;
-    private KeyValueMapper<Long, Integer, KeyValue<Integer, Integer>>
-        keyMapper;
+    private KeyValueMapper<Long, Integer, KeyValue<Integer, Integer>> keyMapper;
 
     private final List<String>
         expectedStreamOneTwoJoin = Arrays.asList("1:A", "2:B", "3:C", "4:D", "5:E");
@@ -77,16 +77,13 @@ public class KStreamRepartitionJoinTest {
         builder = new KStreamBuilder();
         createTopics();
         streamsConfiguration = new Properties();
-        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
-                                 applicationId);
-        streamsConfiguration
-            .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
 
-
         streamOne = builder.stream(Serdes.Long(), Serdes.Integer(), streamOneInput);
         streamTwo = builder.stream(Serdes.Integer(), Serdes.String(), streamTwoInput);
         streamFour = builder.stream(Serdes.Integer(), Serdes.String(), streamFourInput);
@@ -98,12 +95,7 @@ public class KStreamRepartitionJoinTest {
             }
         };
 
-        keyMapper = new KeyValueMapper<Long, Integer, KeyValue<Integer, Integer>>() {
-            @Override
-            public KeyValue<Integer, Integer> apply(final Long key, final Integer value) {
-                return new KeyValue<>(value, value);
-            }
-        };
+        keyMapper = MockKeyValueMapper.<Long, Integer>SelectValueKeyValueMapper();
     }
 
     @After
@@ -146,19 +138,8 @@ public class KStreamRepartitionJoinTest {
     }
 
     private ExpectedOutputOnTopic mapBothStreamsAndJoin() throws Exception {
-
-        final KStream<Integer, Integer>
-            map1 =
-            streamOne.map(keyMapper);
-
-        final KStream<Integer, String> map2 = streamTwo.map(
-            new KeyValueMapper<Integer, String, KeyValue<Integer, String>>() {
-                @Override
-                public KeyValue<Integer, String> apply(Integer key,
-                                                       String value) {
-                    return new KeyValue<>(key, value);
-                }
-            });
+        final KStream<Integer, Integer> map1 = streamOne.map(keyMapper);
+        final KStream<Integer, String> map2 = streamTwo.map(MockKeyValueMapper.<Integer, String>NoOpKeyValueMapper());
 
         doJoin(map1, map2, "map-both-streams-and-join", "map-both-join");
         return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, "map-both-streams-and-join");
@@ -185,14 +166,8 @@ public class KStreamRepartitionJoinTest {
 
     public ExpectedOutputOnTopic selectKeyAndJoin() throws ExecutionException, InterruptedException {
 
-        final KStream<Integer, Integer>
-            keySelected =
-            streamOne.selectKey(new KeyValueMapper<Long, Integer, Integer>() {
-                @Override
-                public Integer apply(final Long key, final Integer value) {
-                    return value;
-                }
-            });
+        final KStream<Integer, Integer> keySelected =
+                streamOne.selectKey(MockKeyValueMapper.<Long, Integer>SelectValueMapper());
 
         String outputTopic = "select-key-join";
         doJoin(keySelected, streamTwo, outputTopic, outputTopic);
@@ -239,18 +214,9 @@ public class KStreamRepartitionJoinTest {
     }
 
     public ExpectedOutputOnTopic mapBothStreamsAndLeftJoin() throws Exception {
-        final KStream<Integer, Integer>
-            map1 =
-            streamOne.map(keyMapper);
+        final KStream<Integer, Integer> map1 = streamOne.map(keyMapper);
 
-        final KStream<Integer, String> map2 = streamTwo.map(
-            new KeyValueMapper<Integer, String, KeyValue<Integer, String>>() {
-                @Override
-                public KeyValue<Integer, String> apply(Integer key,
-                                                       String value) {
-                    return new KeyValue<>(key, value);
-                }
-            });
+        final KStream<Integer, String> map2 = streamTwo.map(MockKeyValueMapper.<Integer, String>NoOpKeyValueMapper());
 
         String outputTopic = "left-join";
         map1.leftJoin(map2,
@@ -266,19 +232,10 @@ public class KStreamRepartitionJoinTest {
 
     private ExpectedOutputOnTopic joinTwoMappedStreamsOneThatHasBeenPreviouslyJoined() throws
                                                                                    Exception {
-        final KStream<Integer, Integer>
-            map1 =
-            streamOne.map(keyMapper);
+        final KStream<Integer, Integer> map1 = streamOne.map(keyMapper);
 
         final KeyValueMapper<Integer, String, KeyValue<Integer, String>>
-            kvMapper =
-            new KeyValueMapper<Integer, String, KeyValue<Integer, String>>() {
-                @Override
-                public KeyValue<Integer, String> apply(Integer key,
-                                                       String value) {
-                    return new KeyValue<>(key, value);
-                }
-            };
+            kvMapper = MockKeyValueMapper.<Integer, String>NoOpKeyValueMapper();
 
         final KStream<Integer, String> map2 = streamTwo.map(kvMapper);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/136a8fab/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java
deleted file mode 100644
index 2096d9b..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements.  See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.  You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-
-package org.apache.kafka.streams.integration;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.ValueMapper;
-
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Locale;
-import java.util.Properties;
-
-import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.junit.Assert.assertThat;
-
-/**
- * End-to-end integration test based on a simple map, using an embedded Kafka cluster.
- */
-public class MapFunctionIntegrationTest {
-    @ClassRule
-    public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
-    private static final String DEFAULT_INPUT_TOPIC = "inputTopic";
-    private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
-
-    @BeforeClass
-    public static void startKafkaCluster() throws Exception {
-        CLUSTER.createTopic(DEFAULT_INPUT_TOPIC);
-        CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC);
-    }
-
-    @Test
-    public void shouldUppercaseTheInput() throws Exception {
-        List<String> inputValues = Arrays.asList("hello", "world");
-        List<String> expectedValues = new ArrayList<>();
-        for (String input : inputValues) {
-            expectedValues.add(input.toUpperCase(Locale.getDefault()));
-        }
-
-        //
-        // Step 1: Configure and start the processor topology.
-        //
-        KStreamBuilder builder = new KStreamBuilder();
-
-        Properties streamsConfiguration = new Properties();
-        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "map-function-integration-test");
-        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
-        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
-        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-        KStream<byte[], String> input = builder.stream(DEFAULT_INPUT_TOPIC);
-        KStream<byte[], String> uppercased = input.mapValues(new ValueMapper<String, String>() {
-            @Override
-            public String apply(String value) {
-                return value.toUpperCase(Locale.getDefault());
-            }
-        });
-        uppercased.to(DEFAULT_OUTPUT_TOPIC);
-
-        KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
-        streams.start();
-
-        //
-        // Step 2: Produce some input data to the input topic.
-        //
-        Properties producerConfig = new Properties();
-        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
-        producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
-        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
-        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerConfig);
-
-        //
-        // Step 3: Verify the application's output data.
-        //
-        Properties consumerConfig = new Properties();
-        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "map-function-integration-test-standard-consumer");
-        consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
-        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        List<String> actualValues = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfig,
-            DEFAULT_OUTPUT_TOPIC, inputValues.size());
-        streams.close();
-        assertThat(actualValues, equalTo(expectedValues));
-    }
-
-}
\ No newline at end of file