You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/30 00:05:20 UTC
[06/50] [abbrv] kafka git commit: MINOR: add unit test for
KGroupedTable.count
MINOR: add unit test for KGroupedTable.count
Author: Damian Guy <da...@gmail.com>
Reviewers: Guozhang Wang <wa...@gmail.com>, Michael G. Noll <mi...@confluent.io>
Closes #1255 from dguy/kgroupedtable-count-test
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1b764c5e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1b764c5e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1b764c5e
Branch: refs/heads/0.10.0
Commit: 1b764c5e834c0d03f3c7107a58f21ad3bbb98ac3
Parents: bc50515
Author: Damian Guy <da...@gmail.com>
Authored: Mon Apr 25 11:18:28 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Apr 25 11:18:28 2016 -0700
----------------------------------------------------------------------
.../internals/KGroupedTableImplTest.java | 86 ++++++++++++++++++++
1 file changed, 86 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1b764c5e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
new file mode 100644
index 0000000..9eeea20
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class KGroupedTableImplTest {
+
+ private File stateDir;
+
+ @Before
+ public void setUp() throws IOException {
+ stateDir = Files.createTempDirectory("test").toFile();
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ Utils.delete(stateDir);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testGroupedCountOccurences() throws IOException {
+ final KStreamBuilder builder = new KStreamBuilder();
+ final String input = "count-test-input";
+ final MockProcessorSupplier processorSupplier = new MockProcessorSupplier<>();
+
+ builder.table(Serdes.String(), Serdes.String(), input)
+ .groupBy(new KeyValueMapper<String, String, KeyValue<String, String>>() {
+ @Override
+ public KeyValue<String, String> apply(final String key, final String value) {
+ return new KeyValue<>(value, value);
+ }
+ }, Serdes.String(), Serdes.String())
+ .count("count")
+ .toStream()
+ .process(processorSupplier);
+
+
+ final KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir);
+
+
+ driver.process(input, "A", "green");
+ driver.process(input, "B", "green");
+ driver.process(input, "A", "blue");
+ driver.process(input, "C", "yellow");
+ driver.process(input, "D", "green");
+
+ final List<String> expected = Arrays.asList("green:1", "green:2", "blue:1", "green:1", "yellow:1", "green:2");
+ final List<String> actual = processorSupplier.processed;
+ assertEquals(expected, actual);
+ }
+}
\ No newline at end of file