You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/12 01:09:56 UTC
[40/50] [abbrv] kafka git commit: KAFKA-3512: Added foreach operator
KAFKA-3512: Added foreach operator
miguno guozhangwang please have a look if you can.
Author: Eno Thereska <en...@gmail.com>
Reviewers: Michael G. Noll <mi...@confluent.io>, Guozhang Wang <wa...@gmail.com>
Closes #1193 from enothereska/kafka-3512-ForEach
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9beafae2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9beafae2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9beafae2
Branch: refs/heads/0.10.0
Commit: 9beafae23a83774fc1d9ea811d449eac34240363
Parents: 3a58407
Author: Eno Thereska <en...@gmail.com>
Authored: Fri Apr 8 09:17:05 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Apr 8 09:17:05 2016 -0700
----------------------------------------------------------------------
.../kafka/streams/kstream/ForeachAction.java | 35 ++++++++
.../apache/kafka/streams/kstream/KStream.java | 8 ++
.../apache/kafka/streams/kstream/KTable.java | 8 ++
.../kstream/internals/KStreamForeach.java | 44 ++++++++++
.../streams/kstream/internals/KStreamImpl.java | 8 ++
.../streams/kstream/internals/KTableImpl.java | 15 ++++
.../kstream/internals/KStreamForeachTest.java | 85 ++++++++++++++++++++
.../kstream/internals/KTableForeachTest.java | 85 ++++++++++++++++++++
8 files changed, 288 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9beafae2/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java
new file mode 100644
index 0000000..83064e8
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+
+
+/**
+ * The ForeachAction interface for performing an action on a key-value pair.
+ * Note that this action is stateless. If stateful processing is required, consider
+ * using {@link KStream#transform(TransformerSupplier, String...)} or
+ * {@link KStream#process(ProcessorSupplier, String...)} instead.
+ *
+ * @param <K> original key type
+ * @param <V> original value type
+ */
+public interface ForeachAction<K, V> {
+ void apply(K key, V value);
+}
+
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/9beafae2/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index e4933cb..a55e726 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -101,6 +101,14 @@ public interface KStream<K, V> {
KStream<K, V> through(String topic);
/**
+ * Perform an action on each element of {@link KStream}.
+ * Note that this is a terminal operation that returns void.
+ *
+ * @param action An action to perform on each element
+ */
+ void foreach(ForeachAction<K, V> action);
+
+ /**
* Materialize this stream to a topic, also creates a new instance of {@link KStream} from the topic
* using default serializers and deserializers and a customizable {@link StreamPartitioner} to determine the distribution of records to partitions.
* This is equivalent to calling {@link #to(StreamPartitioner, String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#stream(String...)}.
http://git-wip-us.apache.org/repos/asf/kafka/blob/9beafae2/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 581ee28..1f6ee68 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -297,4 +297,12 @@ public interface KTable<K, V> {
* @param <K1> the key type of the aggregated {@link KTable}
*/
<K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> selector, String name);
+
+ /**
+ * Perform an action on each element of {@link KTable}.
+ * Note that this is a terminal operation that returns void.
+ *
+ * @param action An action to perform on each element
+ */
+ void foreach(ForeachAction<K, V> action);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9beafae2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamForeach.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamForeach.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamForeach.java
new file mode 100644
index 0000000..2fd7ef9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamForeach.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+class KStreamForeach<K, V> implements ProcessorSupplier<K, V> {
+
+ private final ForeachAction<K, V> action;
+
+ public KStreamForeach(ForeachAction<K, V> action) {
+ this.action = action;
+ }
+
+ @Override
+ public Processor<K, V> get() {
+ return new KStreamForeachProcessor();
+ }
+
+ private class KStreamForeachProcessor extends AbstractProcessor<K, V> {
+ @Override
+ public void process(K key, V value) {
+ action.apply(key, value);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9beafae2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 0fb3984..c266328 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStreamBuilder;
@@ -92,6 +93,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
private static final String WINDOWED_NAME = "KSTREAM-WINDOWED-";
+ private static final String FOREACH_NAME = "KSTREAM-FOREACH-";
+
public KStreamImpl(KStreamBuilder topology, String name, Set<String> sourceNodes) {
super(topology, name, sourceNodes);
}
@@ -201,6 +204,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
}
@Override
+ public void foreach(ForeachAction<K, V> action) {
+ String name = topology.newName(FOREACH_NAME);
+
+ topology.addProcessor(name, new KStreamForeach(action), this.name);
+ }
public KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic) {
return through(keySerde, valSerde, null, topic);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9beafae2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 156f2db..8de9a0b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
@@ -76,6 +77,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
+ private static final String FOREACH_NAME = "KTABLE-FOREACH-";
+
public final ProcessorSupplier<?, ?> processorSupplier;
private final Serde<K> keySerde;
@@ -142,6 +145,18 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
}
@Override
+ public void foreach(final ForeachAction<K, V> action) {
+ String name = topology.newName(FOREACH_NAME);
+ KStreamForeach<K, Change<V>> processorSupplier = new KStreamForeach(new ForeachAction<K, Change<V>>() {
+ @Override
+ public void apply(K key, Change<V> value) {
+ action.apply(key, value.newValue);
+ }
+ });
+ topology.addProcessor(name, processorSupplier, this.name);
+ }
+
+ @Override
public KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic) {
return through(keySerde, valSerde, null, topic);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9beafae2/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
new file mode 100644
index 0000000..6573779
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.junit.Test;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+
+public class KStreamForeachTest {
+
+ final private String topicName = "topic";
+
+ final private Serde<Integer> intSerde = Serdes.Integer();
+ final private Serde<String> stringSerde = Serdes.String();
+
+ @Test
+ public void testForeach() {
+ // Given
+ List<KeyValue<Integer, String>> inputRecords = Arrays.asList(
+ new KeyValue<>(0, "zero"),
+ new KeyValue<>(1, "one"),
+ new KeyValue<>(2, "two"),
+ new KeyValue<>(3, "three")
+ );
+
+ List<KeyValue<Integer, String>> expectedRecords = Arrays.asList(
+ new KeyValue<>(0, "ZERO"),
+ new KeyValue<>(2, "ONE"),
+ new KeyValue<>(4, "TWO"),
+ new KeyValue<>(6, "THREE")
+ );
+
+ final List<KeyValue<Integer, String>> actualRecords = new ArrayList<>();
+ ForeachAction<Integer, String> action =
+ new ForeachAction<Integer, String>() {
+ @Override
+ public void apply(Integer key, String value) {
+ actualRecords.add(new KeyValue<>(key * 2, value.toUpperCase()));
+ }
+ };
+
+ // When
+ KStreamBuilder builder = new KStreamBuilder();
+ KStream<Integer, String> stream = builder.stream(intSerde, stringSerde, topicName);
+ stream.foreach(action);
+
+ // Then
+ KStreamTestDriver driver = new KStreamTestDriver(builder);
+ for (KeyValue<Integer, String> record: inputRecords) {
+ driver.process(topicName, record.key, record.value);
+ }
+
+ assertEquals(expectedRecords.size(), actualRecords.size());
+ for (int i = 0; i < expectedRecords.size(); i++) {
+ KeyValue<Integer, String> expectedRecord = expectedRecords.get(i);
+ KeyValue<Integer, String> actualRecord = actualRecords.get(i);
+ assertEquals(expectedRecord, actualRecord);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9beafae2/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
new file mode 100644
index 0000000..4b612a5
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.junit.Test;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+
+public class KTableForeachTest {
+
+ final private String topicName = "topic";
+
+ final private Serde<Integer> intSerde = Serdes.Integer();
+ final private Serde<String> stringSerde = Serdes.String();
+
+ @Test
+ public void testForeach() {
+ // Given
+ List<KeyValue<Integer, String>> inputRecords = Arrays.asList(
+ new KeyValue<>(0, "zero"),
+ new KeyValue<>(1, "one"),
+ new KeyValue<>(2, "two"),
+ new KeyValue<>(3, "three")
+ );
+
+ List<KeyValue<Integer, String>> expectedRecords = Arrays.asList(
+ new KeyValue<>(0, "ZERO"),
+ new KeyValue<>(2, "ONE"),
+ new KeyValue<>(4, "TWO"),
+ new KeyValue<>(6, "THREE")
+ );
+
+ final List<KeyValue<Integer, String>> actualRecords = new ArrayList<>();
+ ForeachAction<Integer, String> action =
+ new ForeachAction<Integer, String>() {
+ @Override
+ public void apply(Integer key, String value) {
+ actualRecords.add(new KeyValue<>(key * 2, value.toUpperCase()));
+ }
+ };
+
+ // When
+ KStreamBuilder builder = new KStreamBuilder();
+ KTable<Integer, String> table = builder.table(intSerde, stringSerde, topicName);
+ table.foreach(action);
+
+ // Then
+ KStreamTestDriver driver = new KStreamTestDriver(builder);
+ for (KeyValue<Integer, String> record: inputRecords) {
+ driver.process(topicName, record.key, record.value);
+ }
+
+ assertEquals(expectedRecords.size(), actualRecords.size());
+ for (int i = 0; i < expectedRecords.size(); i++) {
+ KeyValue<Integer, String> expectedRecord = expectedRecords.get(i);
+ KeyValue<Integer, String> actualRecord = actualRecords.get(i);
+ assertEquals(expectedRecord, actualRecord);
+ }
+ }
+}