You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/30 00:05:20 UTC

[06/50] [abbrv] kafka git commit: MINOR: add unit test for KGroupedTable.count

MINOR: add unit test for KGroupedTable.count

Author: Damian Guy <da...@gmail.com>

Reviewers: Guozhang Wang <wa...@gmail.com>, Michael G. Noll <mi...@confluent.io>

Closes #1255 from dguy/kgroupedtable-count-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1b764c5e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1b764c5e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1b764c5e

Branch: refs/heads/0.10.0
Commit: 1b764c5e834c0d03f3c7107a58f21ad3bbb98ac3
Parents: bc50515
Author: Damian Guy <da...@gmail.com>
Authored: Mon Apr 25 11:18:28 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Apr 25 11:18:28 2016 -0700

----------------------------------------------------------------------
 .../internals/KGroupedTableImplTest.java        | 86 ++++++++++++++++++++
 1 file changed, 86 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1b764c5e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
new file mode 100644
index 0000000..9eeea20
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class KGroupedTableImplTest {
+
+    private File stateDir;
+
+    @Before
+    public void setUp() throws IOException {
+        stateDir = Files.createTempDirectory("test").toFile();
+    }
+
+    @After
+    public void tearDown() throws IOException {
+        Utils.delete(stateDir);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testGroupedCountOccurences() throws IOException {
+        final KStreamBuilder builder = new KStreamBuilder();
+        final String input = "count-test-input";
+        final MockProcessorSupplier processorSupplier = new MockProcessorSupplier<>();
+
+        builder.table(Serdes.String(), Serdes.String(), input)
+                .groupBy(new KeyValueMapper<String, String, KeyValue<String, String>>() {
+                    @Override
+                    public KeyValue<String, String> apply(final String key, final String value) {
+                        return new KeyValue<>(value, value);
+                    }
+                }, Serdes.String(), Serdes.String())
+                .count("count")
+                .toStream()
+                .process(processorSupplier);
+
+
+        final KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir);
+
+
+        driver.process(input, "A", "green");
+        driver.process(input, "B", "green");
+        driver.process(input, "A", "blue");
+        driver.process(input, "C", "yellow");
+        driver.process(input, "D", "green");
+
+        final List<String> expected = Arrays.asList("green:1", "green:2", "blue:1", "green:1", "yellow:1", "green:2");
+        final List<String> actual = processorSupplier.processed;
+        assertEquals(expected, actual);
+    }
+}
\ No newline at end of file