You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/12 01:09:56 UTC

[40/50] [abbrv] kafka git commit: KAFKA-3512: Added foreach operator

KAFKA-3512: Added foreach operator

miguno guozhangwang please have a look if you can.

Author: Eno Thereska <en...@gmail.com>

Reviewers: Michael G. Noll <mi...@confluent.io>, Guozhang Wang <wa...@gmail.com>

Closes #1193 from enothereska/kafka-3512-ForEach


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9beafae2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9beafae2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9beafae2

Branch: refs/heads/0.10.0
Commit: 9beafae23a83774fc1d9ea811d449eac34240363
Parents: 3a58407
Author: Eno Thereska <en...@gmail.com>
Authored: Fri Apr 8 09:17:05 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Apr 8 09:17:05 2016 -0700

----------------------------------------------------------------------
 .../kafka/streams/kstream/ForeachAction.java    | 35 ++++++++
 .../apache/kafka/streams/kstream/KStream.java   |  8 ++
 .../apache/kafka/streams/kstream/KTable.java    |  8 ++
 .../kstream/internals/KStreamForeach.java       | 44 ++++++++++
 .../streams/kstream/internals/KStreamImpl.java  |  8 ++
 .../streams/kstream/internals/KTableImpl.java   | 15 ++++
 .../kstream/internals/KStreamForeachTest.java   | 85 ++++++++++++++++++++
 .../kstream/internals/KTableForeachTest.java    | 85 ++++++++++++++++++++
 8 files changed, 288 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9beafae2/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java
new file mode 100644
index 0000000..83064e8
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+
+
+/**
+ * The ForeachAction interface for performing an action on a key-value pair.
+ * Note that this action is stateless. If stateful processing is required, consider
+ * using {@link KStream#transform(TransformerSupplier, String...)} or
+ * {@link KStream#process(ProcessorSupplier, String...)} instead.
+ *
+ * @param <K>   original key type
+ * @param <V>   original value type
+ */
+public interface ForeachAction<K, V> {
+    void apply(K key, V value);
+}
+
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/9beafae2/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index e4933cb..a55e726 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -101,6 +101,14 @@ public interface KStream<K, V> {
     KStream<K, V> through(String topic);
 
     /**
+     * Perform an action on each element of {@link KStream}.
+     * Note that this is a terminal operation that returns void.
+     *
+     * @param action An action to perform on each element
+     */
+    void foreach(ForeachAction<K, V> action);
+
+    /**
      * Materialize this stream to a topic, also creates a new instance of {@link KStream} from the topic
      * using default serializers and deserializers and a customizable {@link StreamPartitioner} to determine the distribution of records to partitions.
      * This is equivalent to calling {@link #to(StreamPartitioner, String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#stream(String...)}.

http://git-wip-us.apache.org/repos/asf/kafka/blob/9beafae2/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 581ee28..1f6ee68 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -297,4 +297,12 @@ public interface KTable<K, V> {
      * @param <K1>          the key type of the aggregated {@link KTable}
      */
     <K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> selector, String name);
+
+    /**
+     * Perform an action on each element of {@link KTable}.
+     * Note that this is a terminal operation that returns void.
+     *
+     * @param action An action to perform on each element
+     */
+    void foreach(ForeachAction<K, V> action);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9beafae2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamForeach.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamForeach.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamForeach.java
new file mode 100644
index 0000000..2fd7ef9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamForeach.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+class KStreamForeach<K, V> implements ProcessorSupplier<K, V> {
+
+    private final ForeachAction<K, V> action;
+
+    public KStreamForeach(ForeachAction<K, V> action) {
+        this.action = action;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamForeachProcessor();
+    }
+
+    private class KStreamForeachProcessor extends AbstractProcessor<K, V> {
+        @Override
+        public void process(K key, V value) {
+            action.apply(key, value);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9beafae2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 0fb3984..c266328 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
@@ -92,6 +93,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     private static final String WINDOWED_NAME = "KSTREAM-WINDOWED-";
 
+    private static final String FOREACH_NAME = "KSTREAM-FOREACH-";
+
     public KStreamImpl(KStreamBuilder topology, String name, Set<String> sourceNodes) {
         super(topology, name, sourceNodes);
     }
@@ -201,6 +204,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     @Override
+    public void foreach(ForeachAction<K, V> action) {
+        String name = topology.newName(FOREACH_NAME);
+
+        topology.addProcessor(name, new KStreamForeach(action), this.name);
+    }
     public KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic) {
         return through(keySerde, valSerde, null, topic);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9beafae2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 156f2db..8de9a0b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
@@ -76,6 +77,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
     private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
 
+    private static final String FOREACH_NAME = "KTABLE-FOREACH-";
+
     public final ProcessorSupplier<?, ?> processorSupplier;
 
     private final Serde<K> keySerde;
@@ -142,6 +145,18 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     @Override
+    public void foreach(final ForeachAction<K, V> action) {
+        String name = topology.newName(FOREACH_NAME);
+        KStreamForeach<K, Change<V>> processorSupplier = new KStreamForeach(new ForeachAction<K, Change<V>>() {
+            @Override
+            public void apply(K key, Change<V> value) {
+                action.apply(key, value.newValue);
+            }
+        });
+        topology.addProcessor(name, processorSupplier, this.name);
+    }
+
+    @Override
     public KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic) {
         return through(keySerde, valSerde, null, topic);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9beafae2/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
new file mode 100644
index 0000000..6573779
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.junit.Test;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+
+public class KStreamForeachTest {
+
+    final private String topicName = "topic";
+
+    final private Serde<Integer> intSerde = Serdes.Integer();
+    final private Serde<String> stringSerde = Serdes.String();
+
+    @Test
+    public void testForeach() {
+        // Given
+        List<KeyValue<Integer, String>> inputRecords = Arrays.asList(
+            new KeyValue<>(0, "zero"),
+            new KeyValue<>(1, "one"),
+            new KeyValue<>(2, "two"),
+            new KeyValue<>(3, "three")
+        );
+
+        List<KeyValue<Integer, String>> expectedRecords = Arrays.asList(
+            new KeyValue<>(0, "ZERO"),
+            new KeyValue<>(2, "ONE"),
+            new KeyValue<>(4, "TWO"),
+            new KeyValue<>(6, "THREE")
+        );
+
+        final List<KeyValue<Integer, String>> actualRecords = new ArrayList<>();
+        ForeachAction<Integer, String> action =
+            new ForeachAction<Integer, String>() {
+                @Override
+                public void apply(Integer key, String value) {
+                    actualRecords.add(new KeyValue<>(key * 2, value.toUpperCase()));
+                }
+            };
+
+        // When
+        KStreamBuilder builder = new KStreamBuilder();
+        KStream<Integer, String> stream = builder.stream(intSerde, stringSerde, topicName);
+        stream.foreach(action);
+
+        // Then
+        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        for (KeyValue<Integer, String> record: inputRecords) {
+            driver.process(topicName, record.key, record.value);
+        }
+
+        assertEquals(expectedRecords.size(), actualRecords.size());
+        for (int i = 0; i < expectedRecords.size(); i++) {
+            KeyValue<Integer, String> expectedRecord = expectedRecords.get(i);
+            KeyValue<Integer, String> actualRecord = actualRecords.get(i);
+            assertEquals(expectedRecord, actualRecord);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9beafae2/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
new file mode 100644
index 0000000..4b612a5
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.junit.Test;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+
+public class KTableForeachTest {
+
+    final private String topicName = "topic";
+
+    final private Serde<Integer> intSerde = Serdes.Integer();
+    final private Serde<String> stringSerde = Serdes.String();
+
+    @Test
+    public void testForeach() {
+        // Given
+        List<KeyValue<Integer, String>> inputRecords = Arrays.asList(
+            new KeyValue<>(0, "zero"),
+            new KeyValue<>(1, "one"),
+            new KeyValue<>(2, "two"),
+            new KeyValue<>(3, "three")
+        );
+
+        List<KeyValue<Integer, String>> expectedRecords = Arrays.asList(
+            new KeyValue<>(0, "ZERO"),
+            new KeyValue<>(2, "ONE"),
+            new KeyValue<>(4, "TWO"),
+            new KeyValue<>(6, "THREE")
+        );
+
+        final List<KeyValue<Integer, String>> actualRecords = new ArrayList<>();
+        ForeachAction<Integer, String> action =
+            new ForeachAction<Integer, String>() {
+                @Override
+                public void apply(Integer key, String value) {
+                    actualRecords.add(new KeyValue<>(key * 2, value.toUpperCase()));
+                }
+            };
+
+        // When
+        KStreamBuilder builder = new KStreamBuilder();
+        KTable<Integer, String> table = builder.table(intSerde, stringSerde, topicName);
+        table.foreach(action);
+
+        // Then
+        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        for (KeyValue<Integer, String> record: inputRecords) {
+            driver.process(topicName, record.key, record.value);
+        }
+
+        assertEquals(expectedRecords.size(), actualRecords.size());
+        for (int i = 0; i < expectedRecords.size(); i++) {
+            KeyValue<Integer, String> expectedRecord = expectedRecords.get(i);
+            KeyValue<Integer, String> actualRecord = actualRecords.get(i);
+            assertEquals(expectedRecord, actualRecord);
+        }
+    }
+}