You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/12/11 06:48:30 UTC
kafka git commit: MINOR: test ktable state store creation
Repository: kafka
Updated Branches:
refs/heads/trunk 98a093d6c -> 1dcafadef
MINOR: test ktable state store creation
guozhangwang
* a test for ktable state store creation
Author: Yasuhiro Matsuda <ya...@confluent.io>
Reviewers: Guozhang Wang
Closes #661 from ymatsuda/more_ktable_test
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1dcafade
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1dcafade
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1dcafade
Branch: refs/heads/trunk
Commit: 1dcafadefc09aece9037bc930aa0de7617f655b2
Parents: 98a093d
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Thu Dec 10 21:48:27 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Dec 10 21:48:27 2015 -0800
----------------------------------------------------------------------
.../kstream/internals/KTableImplTest.java | 87 ++++++++++++++++++++
.../apache/kafka/test/KStreamTestDriver.java | 6 ++
.../apache/kafka/test/MockProcessorContext.java | 4 +
3 files changed, 97 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1dcafade/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 56c5703..2317c97 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
@@ -128,6 +129,9 @@ public class KTableImplTest {
KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+ // two state store should be created
+ assertEquals(2, driver.allStateStores().size());
+
KTableValueGetter<String, String> getter1 = getterSupplier1.get();
getter1.init(driver.context());
KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
@@ -217,4 +221,87 @@ public class KTableImplTest {
}
}
+ @Test
+ public void testStateStore() throws IOException {
+ final Serializer<String> serializer = new StringSerializer();
+ final Deserializer<String> deserializer = new StringDeserializer();
+
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+
+ File stateDir = Files.createTempDirectory("test").toFile();
+ try {
+ KStreamBuilder builder = new KStreamBuilder();
+
+ KTableImpl<String, String, String> table1 =
+ (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1);
+ KTableImpl<String, String, String> table2 =
+ (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic2);
+
+ KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues(
+ new ValueMapper<String, Integer>() {
+ @Override
+ public Integer apply(String value) {
+ return new Integer(value);
+ }
+ });
+ KTableImpl<String, Integer, Integer> table1MappedFiltered = (KTableImpl<String, Integer, Integer>) table1Mapped.filter(
+ new Predicate<String, Integer>() {
+ @Override
+ public boolean test(String key, Integer value) {
+ return (value % 2) == 0;
+ }
+ });
+
+ KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+ driver.setTime(0L);
+
+ // no state store should be created
+ assertEquals(0, driver.allStateStores().size());
+
+ } finally {
+ Utils.delete(stateDir);
+ }
+
+ try {
+ KStreamBuilder builder = new KStreamBuilder();
+
+ KTableImpl<String, String, String> table1 =
+ (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1);
+ KTableImpl<String, String, String> table2 =
+ (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic2);
+
+ KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues(
+ new ValueMapper<String, Integer>() {
+ @Override
+ public Integer apply(String value) {
+ return new Integer(value);
+ }
+ });
+ KTableImpl<String, Integer, Integer> table1MappedFiltered = (KTableImpl<String, Integer, Integer>) table1Mapped.filter(
+ new Predicate<String, Integer>() {
+ @Override
+ public boolean test(String key, Integer value) {
+ return (value % 2) == 0;
+ }
+ });
+ table2.join(table1MappedFiltered,
+ new ValueJoiner<String, Integer, String>() {
+ @Override
+ public String apply(String v1, Integer v2) {
+ return v1 + v2;
+ }
+ });
+
+ KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+ driver.setTime(0L);
+
+ // two state store should be created
+ assertEquals(2, driver.allStateStores().size());
+
+ } finally {
+ Utils.delete(stateDir);
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1dcafade/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 8b16cf6..a6c2759 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.processor.internals.RecordCollector;
import java.io.File;
import java.util.List;
+import java.util.Map;
public class KStreamTestDriver {
@@ -116,6 +117,11 @@ public class KStreamTestDriver {
}
}
+ public Map<String, StateStore> allStateStores() {
+ return context.allStateStores();
+ }
+
+
private class MockRecordCollector extends RecordCollector {
public MockRecordCollector() {
super(null);
http://git-wip-us.apache.org/repos/asf/kafka/blob/1dcafade/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index 5d42a63..747b4f1 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import java.io.File;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -178,4 +179,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
return this.timestamp;
}
+ public Map<String, StateStore> allStateStores() {
+ return Collections.unmodifiableMap(storeMap);
+ }
}