You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/02/25 03:11:40 UTC

kafka git commit: MINOR: enhance streams system test

Repository: kafka
Updated Branches:
  refs/heads/trunk fa05752cc -> 13b8fb295


MINOR: enhance streams system test

guozhangwang

* add table aggregate to the system test
* actually create change log partition replica

Author: Yasuhiro Matsuda <ya...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #966 from ymatsuda/enh_systest


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/13b8fb29
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/13b8fb29
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/13b8fb29

Branch: refs/heads/trunk
Commit: 13b8fb295c3becc27e7954af623fe90b3062409e
Parents: fa05752
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Wed Feb 24 18:11:36 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Feb 24 18:11:36 2016 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/streams/StreamsConfig.java |   2 -
 .../internals/InternalTopicManager.java         |   8 +-
 .../internals/ProcessorStateManager.java        |  13 ++-
 .../internals/StreamPartitionAssignor.java      |   9 +-
 .../processor/internals/StreamThread.java       |   4 +-
 .../streams/smoketest/SmokeTestClient.java      |  17 +++
 .../streams/smoketest/SmokeTestDriver.java      | 111 ++++++++++++++++---
 .../kafka/streams/smoketest/SmokeTestUtil.java  |  41 +++++++
 .../streams/smoketest/StreamsSmokeTest.java     |   3 -
 tests/kafkatest/tests/streams_bounce_test.py    |   5 +-
 tests/kafkatest/tests/streams_smoke_test.py     |   3 +-
 11 files changed, 182 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/13b8fb29/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 0cf2888..71d1a6a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -293,11 +293,9 @@ public class StreamsConfig extends AbstractConfig {
 
     private void removeStreamsSpecificConfigs(Map<String, Object> props) {
         props.remove(StreamsConfig.JOB_ID_CONFIG);
-        props.remove(StreamsConfig.REPLICATION_FACTOR_CONFIG);
         props.remove(StreamsConfig.STATE_DIR_CONFIG);
         props.remove(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
         props.remove(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
-        props.remove(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
         props.remove(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
         props.remove(InternalConfig.STREAM_THREAD_INSTANCE);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/13b8fb29/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index 3768260..ce95bb0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -47,6 +47,7 @@ public class InternalTopicManager {
     private static final String ZK_DELETE_TOPIC_PATH = "/admin/delete_topics";
 
     private final ZkClient zkClient;
+    private final int replicationFactor;
 
     private class ZKStringSerializer implements ZkSerializer {
 
@@ -72,11 +73,12 @@ public class InternalTopicManager {
         }
     }
 
-    public InternalTopicManager(String zkConnect) {
-        zkClient = new ZkClient(zkConnect, 30 * 1000, 30 * 1000, new ZKStringSerializer());
+    public InternalTopicManager(String zkConnect, int replicationFactor) {
+        this.zkClient = new ZkClient(zkConnect, 30 * 1000, 30 * 1000, new ZKStringSerializer());
+        this.replicationFactor = replicationFactor;
     }
 
-    public void makeReady(String topic, int numPartitions, int replicationFactor) {
+    public void makeReady(String topic, int numPartitions) {
         boolean topicNotReady = true;
 
         while (topicNotReady) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/13b8fb29/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 30441c5..bae30e3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -217,18 +217,23 @@ public class ProcessorStateManager {
                 restoreConsumer.seekToBeginning(storePartition);
             }
 
-            // restore its state from changelog records; while restoring the log end offset
-            // should not change since it is only written by this thread.
+            // restore its state from changelog records
             long limit = offsetLimit(storePartition);
             while (true) {
+                long offset = 0L;
                 for (ConsumerRecord<byte[], byte[]> record : restoreConsumer.poll(100).records(storePartition)) {
-                    if (record.offset() >= limit) break;
+                    offset = record.offset();
+                    if (offset >= limit) break;
                     stateRestoreCallback.restore(record.key(), record.value());
                 }
 
-                if (restoreConsumer.position(storePartition) == endOffset) {
+                if (offset >= limit) {
+                    break;
+                } else if (restoreConsumer.position(storePartition) == endOffset) {
                     break;
                 } else if (restoreConsumer.position(storePartition) > endOffset) {
+                    // For a logging enabled changelog (no offset limit),
+                    // the log end offset should not change while restoring since it is only written by this thread.
                     throw new IllegalStateException("Log end offset should not change while restoring");
                 }
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/13b8fb29/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index f49601c..440efc8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -115,8 +115,11 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
         this.topicGroups = streamThread.builder.topicGroups();
 
-        if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG))
-            internalTopicManager = new InternalTopicManager((String) configs.get(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG));
+        if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG)) {
+            internalTopicManager = new InternalTopicManager(
+                    (String) configs.get(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG),
+                    (Integer) configs.get(StreamsConfig.REPLICATION_FACTOR_CONFIG));
+        }
     }
 
     @Override
@@ -289,7 +292,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                         numPartitions = task.partition + 1;
                 }
 
-                internalTopicManager.makeReady(topic, numPartitions, 1);
+                internalTopicManager.makeReady(topic, numPartitions);
 
                 // wait until the topic metadata has been propagated to all brokers
                 List<PartitionInfo> partitions;

http://git-wip-us.apache.org/repos/asf/kafka/blob/13b8fb29/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 10e458a..7d460e1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -283,6 +283,9 @@ public class StreamThread extends Thread {
             // already logged in commitAll()
         }
 
+        // Close standby tasks before closing the restore consumer since closing standby tasks uses the restore consumer.
+        removeStandbyTasks();
+
         // We need to first close the underlying clients before closing the state
         // manager, for example we need to make sure producer's message sends
         // have all been acked before the state manager records
@@ -304,7 +307,6 @@ public class StreamThread extends Thread {
         }
 
         removeStreamTasks();
-        removeStandbyTasks();
 
         log.info("Stream thread shutdown complete [" + this.getName() + "]");
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/13b8fb29/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
index 7f1b343..fec447f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
@@ -93,6 +93,7 @@ public class SmokeTestClient extends SmokeTestUtil {
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
         props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
         props.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100);
+        props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2);
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
         KStreamBuilder builder = new KStreamBuilder();
@@ -235,6 +236,22 @@ public class SmokeTestClient extends SmokeTestUtil {
                 }
         ).to("wcnt", stringSerializer, longSerializer);
 
+        // test repartition
+        Agg agg = new Agg();
+        cntTable.aggregate(
+                agg.init(),
+                agg.adder(),
+                agg.remover(),
+                agg.selector(),
+                stringSerializer,
+                longSerializer,
+                longSerializer,
+                stringDeserializer,
+                longDeserializer,
+                longDeserializer,
+                "cntByCnt"
+        ).to("tagg", stringSerializer, longSerializer);
+
         return new KafkaStreams(builder, props);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/13b8fb29/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
index e56a369..c0a6f46 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
@@ -198,7 +198,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
 
         KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
-        List<TopicPartition> partitions = getAllPartitions(consumer, "echo", "max", "min", "dif", "sum", "cnt", "avg", "wcnt");
+        List<TopicPartition> partitions = getAllPartitions(consumer, "echo", "max", "min", "dif", "sum", "cnt", "avg", "wcnt", "tagg");
         consumer.assign(partitions);
         consumer.seekToBeginning(partitions.toArray(new TopicPartition[partitions.size()]));
 
@@ -212,6 +212,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
         HashMap<String, Long> cnt = new HashMap<>();
         HashMap<String, Double> avg = new HashMap<>();
         HashMap<String, Long> wcnt = new HashMap<>();
+        HashMap<String, Long> tagg = new HashMap<>();
 
         HashSet<String> keys = new HashSet<>();
         HashMap<String, Set<Integer>> received = new HashMap<>();
@@ -268,6 +269,9 @@ public class SmokeTestDriver extends SmokeTestUtil {
                         case "wcnt":
                             wcnt.put(key, longDeserializer.deserialize("", record.value()));
                             break;
+                        case "tagg":
+                            tagg.put(key, longDeserializer.deserialize("", record.value()));
+                            break;
                         default:
                             System.out.println("unknown topic: " + record.topic());
                     }
@@ -301,18 +305,19 @@ public class SmokeTestDriver extends SmokeTestUtil {
             System.out.println("missedRecords=" + missedCount);
         }
 
-        success &= verifyMin(min);
-        success &= verifyMax(max);
-        success &= verifyDif(dif);
-        success &= verifySum(sum);
-        success &= verifyCnt(cnt);
-        success &= verifyAvg(avg);
-        success &= verifyWCnt(wcnt);
+        success &= verifyMin(min, allData);
+        success &= verifyMax(max, allData);
+        success &= verifyDif(dif, allData);
+        success &= verifySum(sum, allData);
+        success &= verifyCnt(cnt, allData);
+        success &= verifyAvg(avg, allData);
+        success &= verifyWCnt(wcnt, allData);
+        success &= verifyTAgg(tagg, allData);
 
         System.out.println(success ? "SUCCESS" : "FAILURE");
     }
 
-    private static boolean verifyMin(Map<String, Integer> map) {
+    private static boolean verifyMin(Map<String, Integer> map, Map<String, Set<Integer>> allData) {
         boolean success = true;
         if (map.isEmpty()) {
             System.out.println("min is empty");
@@ -320,6 +325,10 @@ public class SmokeTestDriver extends SmokeTestUtil {
         } else {
             System.out.println("verifying min");
 
+            if (map.size() != allData.size()) {
+                System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size());
+                success = false;
+            }
             for (Map.Entry<String, Integer> entry : map.entrySet()) {
                 int expected = getMin(entry.getKey());
                 if (expected != entry.getValue()) {
@@ -331,7 +340,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
         return success;
     }
 
-    private static boolean verifyMax(Map<String, Integer> map) {
+    private static boolean verifyMax(Map<String, Integer> map, Map<String, Set<Integer>> allData) {
         boolean success = true;
         if (map.isEmpty()) {
             System.out.println("max is empty");
@@ -339,6 +348,10 @@ public class SmokeTestDriver extends SmokeTestUtil {
         } else {
             System.out.println("verifying max");
 
+            if (map.size() != allData.size()) {
+                System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size());
+                success = false;
+            }
             for (Map.Entry<String, Integer> entry : map.entrySet()) {
                 int expected = getMax(entry.getKey());
                 if (expected != entry.getValue()) {
@@ -350,7 +363,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
         return success;
     }
 
-    private static boolean verifyDif(Map<String, Integer> map) {
+    private static boolean verifyDif(Map<String, Integer> map, Map<String, Set<Integer>> allData) {
         boolean success = true;
         if (map.isEmpty()) {
             System.out.println("dif is empty");
@@ -358,6 +371,10 @@ public class SmokeTestDriver extends SmokeTestUtil {
         } else {
             System.out.println("verifying dif");
 
+            if (map.size() != allData.size()) {
+                System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size());
+                success = false;
+            }
             for (Map.Entry<String, Integer> entry : map.entrySet()) {
                 int min = getMin(entry.getKey());
                 int max = getMax(entry.getKey());
@@ -371,7 +388,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
         return success;
     }
 
-    private static boolean verifyCnt(Map<String, Long> map) {
+    private static boolean verifyCnt(Map<String, Long> map, Map<String, Set<Integer>> allData) {
         boolean success = true;
         if (map.isEmpty()) {
             System.out.println("cnt is empty");
@@ -379,6 +396,10 @@ public class SmokeTestDriver extends SmokeTestUtil {
         } else {
             System.out.println("verifying cnt");
 
+            if (map.size() != allData.size()) {
+                System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size());
+                success = false;
+            }
             for (Map.Entry<String, Long> entry : map.entrySet()) {
                 int min = getMin(entry.getKey());
                 int max = getMax(entry.getKey());
@@ -392,7 +413,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
         return success;
     }
 
-    private static boolean verifySum(Map<String, Long> map) {
+    private static boolean verifySum(Map<String, Long> map, Map<String, Set<Integer>> allData) {
         boolean success = true;
         if (map.isEmpty()) {
             System.out.println("sum is empty");
@@ -400,6 +421,10 @@ public class SmokeTestDriver extends SmokeTestUtil {
         } else {
             System.out.println("verifying sum");
 
+            if (map.size() != allData.size()) {
+                System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size());
+                success = false;
+            }
             for (Map.Entry<String, Long> entry : map.entrySet()) {
                 int min = getMin(entry.getKey());
                 int max = getMax(entry.getKey());
@@ -413,7 +438,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
         return success;
     }
 
-    private static boolean verifyAvg(Map<String, Double> map) {
+    private static boolean verifyAvg(Map<String, Double> map, Map<String, Set<Integer>> allData) {
         boolean success = true;
         if (map.isEmpty()) {
             System.out.println("avg is empty");
@@ -421,6 +446,10 @@ public class SmokeTestDriver extends SmokeTestUtil {
         } else {
             System.out.println("verifying avg");
 
+            if (map.size() != allData.size()) {
+                System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size());
+                success = false;
+            }
             for (Map.Entry<String, Double> entry : map.entrySet()) {
                 int min = getMin(entry.getKey());
                 int max = getMax(entry.getKey());
@@ -435,7 +464,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
         return success;
     }
 
-    private static boolean verifyWCnt(Map<String, Long> map) {
+    private static boolean verifyWCnt(Map<String, Long> map, Map<String, Set<Integer>> allData) {
         boolean success = true;
         if (map.isEmpty()) {
             System.out.println("wcnt is empty");
@@ -443,6 +472,17 @@ public class SmokeTestDriver extends SmokeTestUtil {
         } else {
             System.out.println("verifying wcnt");
 
+            int expectedSize = 0;
+            for (Set<Integer> values : allData.values()) {
+                int maxValue = Collections.max(values);
+                int minValue = Collections.min(values);
+                expectedSize += maxValue / WINDOW_SIZE + 1;
+                expectedSize -= minValue / WINDOW_SIZE;
+            }
+            if (map.size() != expectedSize) {
+                System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + expectedSize);
+                success = false;
+            }
             for (Map.Entry<String, Long> entry : map.entrySet()) {
                 long minTime = getMinFromWKey(entry.getKey()) + START_TIME;
                 long maxTime = getMaxFromWKey(entry.getKey()) + START_TIME;
@@ -461,6 +501,47 @@ public class SmokeTestDriver extends SmokeTestUtil {
         return success;
     }
 
+    private static boolean verifyTAgg(Map<String, Long> map, Map<String, Set<Integer>> allData) {
+        boolean success = true;
+        if (map.isEmpty()) {
+            System.out.println("tagg is empty");
+            success = false;
+        } else {
+            System.out.println("verifying tagg");
+
+            // generate expected answer
+            Map<String, Long> expected = new HashMap<>();
+            for (String key : allData.keySet()) {
+                int min = getMin(key);
+                int max = getMax(key);
+                String cnt = Long.toString(max - min + 1L);
+
+                if (expected.containsKey(cnt)) {
+                    expected.put(cnt, expected.get(cnt) + 1L);
+                } else {
+                    expected.put(cnt, 1L);
+                }
+            }
+
+            // check the result
+            for (Map.Entry<String, Long> entry : map.entrySet()) {
+                String key = entry.getKey();
+                Long expectedCount = expected.remove(key);
+                if (expectedCount == null)
+                    expectedCount = 0L;
+
+                if (entry.getValue() != expectedCount) {
+                    System.out.println("fail: key=" + key + " tagg=" + entry.getValue() + " expected=" + expected.get(key));
+                    success = false;
+                }
+            }
+            for (Map.Entry<String, Long> entry : expected.entrySet()) {
+                System.out.println("fail: missingKey=" + entry.getKey() + " expected=" + entry.getValue());
+            }
+        }
+        return success;
+    }
+
     private static int getMin(String key) {
         return Integer.parseInt(key.split("-")[0]);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/13b8fb29/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
index 4a13599..3f5503f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
@@ -27,6 +27,8 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.Processor;
@@ -87,6 +89,45 @@ public class SmokeTestUtil {
         }
     }
 
+    public static class Agg {
+
+        public KeyValueMapper<String, Long, KeyValue<String, Long>> selector() {
+            return new KeyValueMapper<String, Long, KeyValue<String, Long>>() {
+                @Override
+                public KeyValue<String, Long> apply(String key, Long value) {
+                    return new KeyValue<>(Long.toString(value), 1L);
+                }
+            };
+        }
+
+        public Initializer<Long> init() {
+            return new Initializer<Long>() {
+                @Override
+                public Long apply() {
+                    return 0L;
+                }
+            };
+        }
+
+        public Aggregator<String, Long, Long> adder() {
+            return new Aggregator<String, Long, Long>() {
+                @Override
+                public Long apply(String aggKey, Long value, Long aggregate) {
+                    return aggregate + value;
+                }
+            };
+        }
+
+        public Aggregator<String, Long, Long> remover() {
+            return new Aggregator<String, Long, Long>() {
+                @Override
+                public Long apply(String aggKey, Long value, Long aggregate) {
+                    return aggregate - value;
+                }
+            };
+        }
+    }
+
     public static Serializer<String> stringSerializer = new StringSerializer();
 
     public static Deserializer<String> stringDeserializer = new StringDeserializer();

http://git-wip-us.apache.org/repos/asf/kafka/blob/13b8fb29/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java
index a6cd141..c26544e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java
@@ -21,9 +21,6 @@ import java.io.File;
 import java.util.Map;
 import java.util.Set;
 
-/**
- * Created by yasuhiro on 2/10/16.
- */
 public class StreamsSmokeTest {
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/13b8fb29/tests/kafkatest/tests/streams_bounce_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams_bounce_test.py b/tests/kafkatest/tests/streams_bounce_test.py
index 176f010..2b9c4d6 100644
--- a/tests/kafkatest/tests/streams_bounce_test.py
+++ b/tests/kafkatest/tests/streams_bounce_test.py
@@ -33,7 +33,8 @@ class StreamsBounceTest(KafkaTest):
             'dif' : { 'partitions': 5, 'replication-factor': 2 },
             'cnt' : { 'partitions': 5, 'replication-factor': 2 },
             'avg' : { 'partitions': 5, 'replication-factor': 2 },
-            'wcnt' : { 'partitions': 5, 'replication-factor': 2 }
+            'wcnt' : { 'partitions': 5, 'replication-factor': 2 },
+            'tagg' : { 'partitions': 5, 'replication-factor': 2 }
         })
 
         self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
@@ -58,7 +59,7 @@ class StreamsBounceTest(KafkaTest):
         # enable this after we add change log partition replicas
         #self.kafka.signal_leader("data")
 
-        time.sleep(15);
+        #time.sleep(15);
 
         self.processor1.abortThenRestart()
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/13b8fb29/tests/kafkatest/tests/streams_smoke_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams_smoke_test.py b/tests/kafkatest/tests/streams_smoke_test.py
index 2861837..48e4db8 100644
--- a/tests/kafkatest/tests/streams_smoke_test.py
+++ b/tests/kafkatest/tests/streams_smoke_test.py
@@ -33,7 +33,8 @@ class StreamsSmokeTest(KafkaTest):
             'dif' : { 'partitions': 5, 'replication-factor': 1 },
             'cnt' : { 'partitions': 5, 'replication-factor': 1 },
             'avg' : { 'partitions': 5, 'replication-factor': 1 },
-            'wcnt' : { 'partitions': 5, 'replication-factor': 1 }
+            'wcnt' : { 'partitions': 5, 'replication-factor': 1 },
+            'tagg' : { 'partitions': 5, 'replication-factor': 1 }
         })
 
         self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)