You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/12/11 06:48:30 UTC

kafka git commit: MINOR: test ktable state store creation

Repository: kafka
Updated Branches:
  refs/heads/trunk 98a093d6c -> 1dcafadef


MINOR: test ktable state store creation

guozhangwang
* a test for ktable state store creation

Author: Yasuhiro Matsuda <ya...@confluent.io>

Reviewers: Guozhang Wang

Closes #661 from ymatsuda/more_ktable_test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1dcafade
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1dcafade
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1dcafade

Branch: refs/heads/trunk
Commit: 1dcafadefc09aece9037bc930aa0de7617f655b2
Parents: 98a093d
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Thu Dec 10 21:48:27 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Dec 10 21:48:27 2015 -0800

----------------------------------------------------------------------
 .../kstream/internals/KTableImplTest.java       | 87 ++++++++++++++++++++
 .../apache/kafka/test/KStreamTestDriver.java    |  6 ++
 .../apache/kafka/test/MockProcessorContext.java |  4 +
 3 files changed, 97 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1dcafade/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 56c5703..2317c97 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
@@ -128,6 +129,9 @@ public class KTableImplTest {
 
             KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
 
+            // two state store should be created
+            assertEquals(2, driver.allStateStores().size());
+
             KTableValueGetter<String, String> getter1 = getterSupplier1.get();
             getter1.init(driver.context());
             KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
@@ -217,4 +221,87 @@ public class KTableImplTest {
         }
     }
 
+    @Test
+    public void testStateStore() throws IOException {
+        final Serializer<String> serializer = new StringSerializer();
+        final Deserializer<String> deserializer = new StringDeserializer();
+
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+
+        File stateDir = Files.createTempDirectory("test").toFile();
+        try {
+            KStreamBuilder builder = new KStreamBuilder();
+
+            KTableImpl<String, String, String> table1 =
+                    (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1);
+            KTableImpl<String, String, String> table2 =
+                    (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic2);
+
+            KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues(
+                    new ValueMapper<String, Integer>() {
+                        @Override
+                        public Integer apply(String value) {
+                            return new Integer(value);
+                        }
+                    });
+            KTableImpl<String, Integer, Integer> table1MappedFiltered = (KTableImpl<String, Integer, Integer>) table1Mapped.filter(
+                    new Predicate<String, Integer>() {
+                        @Override
+                        public boolean test(String key, Integer value) {
+                            return (value % 2) == 0;
+                        }
+                    });
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+            driver.setTime(0L);
+
+            // no state store should be created
+            assertEquals(0, driver.allStateStores().size());
+
+        } finally {
+            Utils.delete(stateDir);
+        }
+
+        try {
+            KStreamBuilder builder = new KStreamBuilder();
+
+            KTableImpl<String, String, String> table1 =
+                    (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1);
+            KTableImpl<String, String, String> table2 =
+                    (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic2);
+
+            KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues(
+                    new ValueMapper<String, Integer>() {
+                        @Override
+                        public Integer apply(String value) {
+                            return new Integer(value);
+                        }
+                    });
+            KTableImpl<String, Integer, Integer> table1MappedFiltered = (KTableImpl<String, Integer, Integer>) table1Mapped.filter(
+                    new Predicate<String, Integer>() {
+                        @Override
+                        public boolean test(String key, Integer value) {
+                            return (value % 2) == 0;
+                        }
+                    });
+            table2.join(table1MappedFiltered,
+                    new ValueJoiner<String, Integer, String>() {
+                        @Override
+                        public String apply(String v1, Integer v2) {
+                            return v1 + v2;
+                        }
+                    });
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+            driver.setTime(0L);
+
+            // two state store should be created
+            assertEquals(2, driver.allStateStores().size());
+
+        } finally {
+            Utils.delete(stateDir);
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1dcafade/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 8b16cf6..a6c2759 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.processor.internals.RecordCollector;
 
 import java.io.File;
 import java.util.List;
+import java.util.Map;
 
 public class KStreamTestDriver {
 
@@ -116,6 +117,11 @@ public class KStreamTestDriver {
         }
     }
 
+    public Map<String, StateStore> allStateStores() {
+        return context.allStateStores();
+    }
+
+
     private class MockRecordCollector extends RecordCollector {
         public MockRecordCollector() {
             super(null);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1dcafade/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index 5d42a63..747b4f1 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 
 import java.io.File;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -178,4 +179,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
         return this.timestamp;
     }
 
+    public Map<String, StateStore> allStateStores() {
+        return Collections.unmodifiableMap(storeMap);
+    }
 }