You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/02/25 03:11:40 UTC
kafka git commit: MINOR: enhance streams system test
Repository: kafka
Updated Branches:
refs/heads/trunk fa05752cc -> 13b8fb295
MINOR: enhance streams system test
guozhangwang
* add table aggregate to the system test
* actually create change log partition replica
Author: Yasuhiro Matsuda <ya...@confluent.io>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #966 from ymatsuda/enh_systest
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/13b8fb29
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/13b8fb29
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/13b8fb29
Branch: refs/heads/trunk
Commit: 13b8fb295c3becc27e7954af623fe90b3062409e
Parents: fa05752
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Wed Feb 24 18:11:36 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Feb 24 18:11:36 2016 -0800
----------------------------------------------------------------------
.../org/apache/kafka/streams/StreamsConfig.java | 2 -
.../internals/InternalTopicManager.java | 8 +-
.../internals/ProcessorStateManager.java | 13 ++-
.../internals/StreamPartitionAssignor.java | 9 +-
.../processor/internals/StreamThread.java | 4 +-
.../streams/smoketest/SmokeTestClient.java | 17 +++
.../streams/smoketest/SmokeTestDriver.java | 111 ++++++++++++++++---
.../kafka/streams/smoketest/SmokeTestUtil.java | 41 +++++++
.../streams/smoketest/StreamsSmokeTest.java | 3 -
tests/kafkatest/tests/streams_bounce_test.py | 5 +-
tests/kafkatest/tests/streams_smoke_test.py | 3 +-
11 files changed, 182 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/13b8fb29/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 0cf2888..71d1a6a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -293,11 +293,9 @@ public class StreamsConfig extends AbstractConfig {
private void removeStreamsSpecificConfigs(Map<String, Object> props) {
props.remove(StreamsConfig.JOB_ID_CONFIG);
- props.remove(StreamsConfig.REPLICATION_FACTOR_CONFIG);
props.remove(StreamsConfig.STATE_DIR_CONFIG);
props.remove(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
props.remove(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
- props.remove(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
props.remove(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
props.remove(InternalConfig.STREAM_THREAD_INSTANCE);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/13b8fb29/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index 3768260..ce95bb0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -47,6 +47,7 @@ public class InternalTopicManager {
private static final String ZK_DELETE_TOPIC_PATH = "/admin/delete_topics";
private final ZkClient zkClient;
+ private final int replicationFactor;
private class ZKStringSerializer implements ZkSerializer {
@@ -72,11 +73,12 @@ public class InternalTopicManager {
}
}
- public InternalTopicManager(String zkConnect) {
- zkClient = new ZkClient(zkConnect, 30 * 1000, 30 * 1000, new ZKStringSerializer());
+ public InternalTopicManager(String zkConnect, int replicationFactor) {
+ this.zkClient = new ZkClient(zkConnect, 30 * 1000, 30 * 1000, new ZKStringSerializer());
+ this.replicationFactor = replicationFactor;
}
- public void makeReady(String topic, int numPartitions, int replicationFactor) {
+ public void makeReady(String topic, int numPartitions) {
boolean topicNotReady = true;
while (topicNotReady) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/13b8fb29/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 30441c5..bae30e3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -217,18 +217,23 @@ public class ProcessorStateManager {
restoreConsumer.seekToBeginning(storePartition);
}
- // restore its state from changelog records; while restoring the log end offset
- // should not change since it is only written by this thread.
+ // restore its state from changelog records
long limit = offsetLimit(storePartition);
while (true) {
+ long offset = 0L;
for (ConsumerRecord<byte[], byte[]> record : restoreConsumer.poll(100).records(storePartition)) {
- if (record.offset() >= limit) break;
+ offset = record.offset();
+ if (offset >= limit) break;
stateRestoreCallback.restore(record.key(), record.value());
}
- if (restoreConsumer.position(storePartition) == endOffset) {
+ if (offset >= limit) {
+ break;
+ } else if (restoreConsumer.position(storePartition) == endOffset) {
break;
} else if (restoreConsumer.position(storePartition) > endOffset) {
+ // For a logging enabled changelog (no offset limit),
+ // the log end offset should not change while restoring since it is only written by this thread.
throw new IllegalStateException("Log end offset should not change while restoring");
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/13b8fb29/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index f49601c..440efc8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -115,8 +115,11 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
this.topicGroups = streamThread.builder.topicGroups();
- if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG))
- internalTopicManager = new InternalTopicManager((String) configs.get(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG));
+ if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG)) {
+ internalTopicManager = new InternalTopicManager(
+ (String) configs.get(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG),
+ (Integer) configs.get(StreamsConfig.REPLICATION_FACTOR_CONFIG));
+ }
}
@Override
@@ -289,7 +292,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
numPartitions = task.partition + 1;
}
- internalTopicManager.makeReady(topic, numPartitions, 1);
+ internalTopicManager.makeReady(topic, numPartitions);
// wait until the topic metadata has been propagated to all brokers
List<PartitionInfo> partitions;
http://git-wip-us.apache.org/repos/asf/kafka/blob/13b8fb29/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 10e458a..7d460e1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -283,6 +283,9 @@ public class StreamThread extends Thread {
// already logged in commitAll()
}
+ // Close standby tasks before closing the restore consumer since closing standby tasks uses the restore consumer.
+ removeStandbyTasks();
+
// We need to first close the underlying clients before closing the state
// manager, for example we need to make sure producer's message sends
// have all been acked before the state manager records
@@ -304,7 +307,6 @@ public class StreamThread extends Thread {
}
removeStreamTasks();
- removeStandbyTasks();
log.info("Stream thread shutdown complete [" + this.getName() + "]");
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/13b8fb29/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
index 7f1b343..fec447f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
@@ -93,6 +93,7 @@ public class SmokeTestClient extends SmokeTestUtil {
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
props.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100);
+ props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KStreamBuilder builder = new KStreamBuilder();
@@ -235,6 +236,22 @@ public class SmokeTestClient extends SmokeTestUtil {
}
).to("wcnt", stringSerializer, longSerializer);
+ // test repartition
+ Agg agg = new Agg();
+ cntTable.aggregate(
+ agg.init(),
+ agg.adder(),
+ agg.remover(),
+ agg.selector(),
+ stringSerializer,
+ longSerializer,
+ longSerializer,
+ stringDeserializer,
+ longDeserializer,
+ longDeserializer,
+ "cntByCnt"
+ ).to("tagg", stringSerializer, longSerializer);
+
return new KafkaStreams(builder, props);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/13b8fb29/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
index e56a369..c0a6f46 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
@@ -198,7 +198,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
- List<TopicPartition> partitions = getAllPartitions(consumer, "echo", "max", "min", "dif", "sum", "cnt", "avg", "wcnt");
+ List<TopicPartition> partitions = getAllPartitions(consumer, "echo", "max", "min", "dif", "sum", "cnt", "avg", "wcnt", "tagg");
consumer.assign(partitions);
consumer.seekToBeginning(partitions.toArray(new TopicPartition[partitions.size()]));
@@ -212,6 +212,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
HashMap<String, Long> cnt = new HashMap<>();
HashMap<String, Double> avg = new HashMap<>();
HashMap<String, Long> wcnt = new HashMap<>();
+ HashMap<String, Long> tagg = new HashMap<>();
HashSet<String> keys = new HashSet<>();
HashMap<String, Set<Integer>> received = new HashMap<>();
@@ -268,6 +269,9 @@ public class SmokeTestDriver extends SmokeTestUtil {
case "wcnt":
wcnt.put(key, longDeserializer.deserialize("", record.value()));
break;
+ case "tagg":
+ tagg.put(key, longDeserializer.deserialize("", record.value()));
+ break;
default:
System.out.println("unknown topic: " + record.topic());
}
@@ -301,18 +305,19 @@ public class SmokeTestDriver extends SmokeTestUtil {
System.out.println("missedRecords=" + missedCount);
}
- success &= verifyMin(min);
- success &= verifyMax(max);
- success &= verifyDif(dif);
- success &= verifySum(sum);
- success &= verifyCnt(cnt);
- success &= verifyAvg(avg);
- success &= verifyWCnt(wcnt);
+ success &= verifyMin(min, allData);
+ success &= verifyMax(max, allData);
+ success &= verifyDif(dif, allData);
+ success &= verifySum(sum, allData);
+ success &= verifyCnt(cnt, allData);
+ success &= verifyAvg(avg, allData);
+ success &= verifyWCnt(wcnt, allData);
+ success &= verifyTAgg(tagg, allData);
System.out.println(success ? "SUCCESS" : "FAILURE");
}
- private static boolean verifyMin(Map<String, Integer> map) {
+ private static boolean verifyMin(Map<String, Integer> map, Map<String, Set<Integer>> allData) {
boolean success = true;
if (map.isEmpty()) {
System.out.println("min is empty");
@@ -320,6 +325,10 @@ public class SmokeTestDriver extends SmokeTestUtil {
} else {
System.out.println("verifying min");
+ if (map.size() != allData.size()) {
+ System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size());
+ success = false;
+ }
for (Map.Entry<String, Integer> entry : map.entrySet()) {
int expected = getMin(entry.getKey());
if (expected != entry.getValue()) {
@@ -331,7 +340,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
return success;
}
- private static boolean verifyMax(Map<String, Integer> map) {
+ private static boolean verifyMax(Map<String, Integer> map, Map<String, Set<Integer>> allData) {
boolean success = true;
if (map.isEmpty()) {
System.out.println("max is empty");
@@ -339,6 +348,10 @@ public class SmokeTestDriver extends SmokeTestUtil {
} else {
System.out.println("verifying max");
+ if (map.size() != allData.size()) {
+ System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size());
+ success = false;
+ }
for (Map.Entry<String, Integer> entry : map.entrySet()) {
int expected = getMax(entry.getKey());
if (expected != entry.getValue()) {
@@ -350,7 +363,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
return success;
}
- private static boolean verifyDif(Map<String, Integer> map) {
+ private static boolean verifyDif(Map<String, Integer> map, Map<String, Set<Integer>> allData) {
boolean success = true;
if (map.isEmpty()) {
System.out.println("dif is empty");
@@ -358,6 +371,10 @@ public class SmokeTestDriver extends SmokeTestUtil {
} else {
System.out.println("verifying dif");
+ if (map.size() != allData.size()) {
+ System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size());
+ success = false;
+ }
for (Map.Entry<String, Integer> entry : map.entrySet()) {
int min = getMin(entry.getKey());
int max = getMax(entry.getKey());
@@ -371,7 +388,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
return success;
}
- private static boolean verifyCnt(Map<String, Long> map) {
+ private static boolean verifyCnt(Map<String, Long> map, Map<String, Set<Integer>> allData) {
boolean success = true;
if (map.isEmpty()) {
System.out.println("cnt is empty");
@@ -379,6 +396,10 @@ public class SmokeTestDriver extends SmokeTestUtil {
} else {
System.out.println("verifying cnt");
+ if (map.size() != allData.size()) {
+ System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size());
+ success = false;
+ }
for (Map.Entry<String, Long> entry : map.entrySet()) {
int min = getMin(entry.getKey());
int max = getMax(entry.getKey());
@@ -392,7 +413,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
return success;
}
- private static boolean verifySum(Map<String, Long> map) {
+ private static boolean verifySum(Map<String, Long> map, Map<String, Set<Integer>> allData) {
boolean success = true;
if (map.isEmpty()) {
System.out.println("sum is empty");
@@ -400,6 +421,10 @@ public class SmokeTestDriver extends SmokeTestUtil {
} else {
System.out.println("verifying sum");
+ if (map.size() != allData.size()) {
+ System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size());
+ success = false;
+ }
for (Map.Entry<String, Long> entry : map.entrySet()) {
int min = getMin(entry.getKey());
int max = getMax(entry.getKey());
@@ -413,7 +438,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
return success;
}
- private static boolean verifyAvg(Map<String, Double> map) {
+ private static boolean verifyAvg(Map<String, Double> map, Map<String, Set<Integer>> allData) {
boolean success = true;
if (map.isEmpty()) {
System.out.println("avg is empty");
@@ -421,6 +446,10 @@ public class SmokeTestDriver extends SmokeTestUtil {
} else {
System.out.println("verifying avg");
+ if (map.size() != allData.size()) {
+ System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size());
+ success = false;
+ }
for (Map.Entry<String, Double> entry : map.entrySet()) {
int min = getMin(entry.getKey());
int max = getMax(entry.getKey());
@@ -435,7 +464,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
return success;
}
- private static boolean verifyWCnt(Map<String, Long> map) {
+ private static boolean verifyWCnt(Map<String, Long> map, Map<String, Set<Integer>> allData) {
boolean success = true;
if (map.isEmpty()) {
System.out.println("wcnt is empty");
@@ -443,6 +472,17 @@ public class SmokeTestDriver extends SmokeTestUtil {
} else {
System.out.println("verifying wcnt");
+ int expectedSize = 0;
+ for (Set<Integer> values : allData.values()) {
+ int maxValue = Collections.max(values);
+ int minValue = Collections.min(values);
+ expectedSize += maxValue / WINDOW_SIZE + 1;
+ expectedSize -= minValue / WINDOW_SIZE;
+ }
+ if (map.size() != expectedSize) {
+ System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + expectedSize);
+ success = false;
+ }
for (Map.Entry<String, Long> entry : map.entrySet()) {
long minTime = getMinFromWKey(entry.getKey()) + START_TIME;
long maxTime = getMaxFromWKey(entry.getKey()) + START_TIME;
@@ -461,6 +501,47 @@ public class SmokeTestDriver extends SmokeTestUtil {
return success;
}
+ private static boolean verifyTAgg(Map<String, Long> map, Map<String, Set<Integer>> allData) {
+ boolean success = true;
+ if (map.isEmpty()) {
+ System.out.println("tagg is empty");
+ success = false;
+ } else {
+ System.out.println("verifying tagg");
+
+ // generate expected answer
+ Map<String, Long> expected = new HashMap<>();
+ for (String key : allData.keySet()) {
+ int min = getMin(key);
+ int max = getMax(key);
+ String cnt = Long.toString(max - min + 1L);
+
+ if (expected.containsKey(cnt)) {
+ expected.put(cnt, expected.get(cnt) + 1L);
+ } else {
+ expected.put(cnt, 1L);
+ }
+ }
+
+ // check the result
+ for (Map.Entry<String, Long> entry : map.entrySet()) {
+ String key = entry.getKey();
+ Long expectedCount = expected.remove(key);
+ if (expectedCount == null)
+ expectedCount = 0L;
+
+ if (entry.getValue() != expectedCount) {
+ System.out.println("fail: key=" + key + " tagg=" + entry.getValue() + " expected=" + expected.get(key));
+ success = false;
+ }
+ }
+ for (Map.Entry<String, Long> entry : expected.entrySet()) {
+ System.out.println("fail: missingKey=" + entry.getKey() + " expected=" + entry.getValue());
+ }
+ }
+ return success;
+ }
+
private static int getMin(String key) {
return Integer.parseInt(key.split("-")[0]);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/13b8fb29/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
index 4a13599..3f5503f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
@@ -27,6 +27,8 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.Processor;
@@ -87,6 +89,45 @@ public class SmokeTestUtil {
}
}
+ public static class Agg {
+
+ public KeyValueMapper<String, Long, KeyValue<String, Long>> selector() {
+ return new KeyValueMapper<String, Long, KeyValue<String, Long>>() {
+ @Override
+ public KeyValue<String, Long> apply(String key, Long value) {
+ return new KeyValue<>(Long.toString(value), 1L);
+ }
+ };
+ }
+
+ public Initializer<Long> init() {
+ return new Initializer<Long>() {
+ @Override
+ public Long apply() {
+ return 0L;
+ }
+ };
+ }
+
+ public Aggregator<String, Long, Long> adder() {
+ return new Aggregator<String, Long, Long>() {
+ @Override
+ public Long apply(String aggKey, Long value, Long aggregate) {
+ return aggregate + value;
+ }
+ };
+ }
+
+ public Aggregator<String, Long, Long> remover() {
+ return new Aggregator<String, Long, Long>() {
+ @Override
+ public Long apply(String aggKey, Long value, Long aggregate) {
+ return aggregate - value;
+ }
+ };
+ }
+ }
+
public static Serializer<String> stringSerializer = new StringSerializer();
public static Deserializer<String> stringDeserializer = new StringDeserializer();
http://git-wip-us.apache.org/repos/asf/kafka/blob/13b8fb29/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java
index a6cd141..c26544e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java
@@ -21,9 +21,6 @@ import java.io.File;
import java.util.Map;
import java.util.Set;
-/**
- * Created by yasuhiro on 2/10/16.
- */
public class StreamsSmokeTest {
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/13b8fb29/tests/kafkatest/tests/streams_bounce_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams_bounce_test.py b/tests/kafkatest/tests/streams_bounce_test.py
index 176f010..2b9c4d6 100644
--- a/tests/kafkatest/tests/streams_bounce_test.py
+++ b/tests/kafkatest/tests/streams_bounce_test.py
@@ -33,7 +33,8 @@ class StreamsBounceTest(KafkaTest):
'dif' : { 'partitions': 5, 'replication-factor': 2 },
'cnt' : { 'partitions': 5, 'replication-factor': 2 },
'avg' : { 'partitions': 5, 'replication-factor': 2 },
- 'wcnt' : { 'partitions': 5, 'replication-factor': 2 }
+ 'wcnt' : { 'partitions': 5, 'replication-factor': 2 },
+ 'tagg' : { 'partitions': 5, 'replication-factor': 2 }
})
self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
@@ -58,7 +59,7 @@ class StreamsBounceTest(KafkaTest):
# enable this after we add change log partition replicas
#self.kafka.signal_leader("data")
- time.sleep(15);
+ #time.sleep(15);
self.processor1.abortThenRestart()
http://git-wip-us.apache.org/repos/asf/kafka/blob/13b8fb29/tests/kafkatest/tests/streams_smoke_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams_smoke_test.py b/tests/kafkatest/tests/streams_smoke_test.py
index 2861837..48e4db8 100644
--- a/tests/kafkatest/tests/streams_smoke_test.py
+++ b/tests/kafkatest/tests/streams_smoke_test.py
@@ -33,7 +33,8 @@ class StreamsSmokeTest(KafkaTest):
'dif' : { 'partitions': 5, 'replication-factor': 1 },
'cnt' : { 'partitions': 5, 'replication-factor': 1 },
'avg' : { 'partitions': 5, 'replication-factor': 1 },
- 'wcnt' : { 'partitions': 5, 'replication-factor': 1 }
+ 'wcnt' : { 'partitions': 5, 'replication-factor': 1 },
+ 'tagg' : { 'partitions': 5, 'replication-factor': 1 }
})
self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)