You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/04/17 05:18:30 UTC
kafka git commit: KAFKA-3430: Allow users to set key in
KTable.toStream and in KStream.
Repository: kafka
Updated Branches:
refs/heads/trunk 5236bf60d -> b1691cf49
KAFKA-3430: Allow users to set key in KTable.toStream and in KStream.
… With KStream the method selectKey was added to enable getting a key from values before perfoming aggregation-by-key operations on original streams that have null keys.
Author: bbejeck <bb...@gmail.com>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #1222 from bbejeck/KAFKA-3430_allow_users_to_set_key_KTable_toStream
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b1691cf4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b1691cf4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b1691cf4
Branch: refs/heads/trunk
Commit: b1691cf49e9de850ac8a2675c487af9fb60bfdaa
Parents: 5236bf6
Author: Bill Bejeck <bb...@gmail.com>
Authored: Sat Apr 16 20:18:27 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sat Apr 16 20:18:27 2016 -0700
----------------------------------------------------------------------
.../apache/kafka/streams/kstream/KStream.java | 9 ++
.../apache/kafka/streams/kstream/KTable.java | 9 ++
.../streams/kstream/internals/KStreamImpl.java | 15 ++++
.../streams/kstream/internals/KTableImpl.java | 5 ++
.../kstream/internals/KStreamSelectKeyTest.java | 83 ++++++++++++++++++
.../kstream/internals/KTableMapKeysTest.java | 88 ++++++++++++++++++++
6 files changed, 209 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b1691cf4/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 27475aa..7e3562c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -46,6 +46,15 @@ public interface KStream<K, V> {
*/
KStream<K, V> filterNot(Predicate<K, V> predicate);
+
+ /**
+ * Create a new key from the current key and value.
+ *
+ * @param mapper the instance of {@link KeyValueMapper}
+ * @param <K1> the new key type on the stream
+ */
+ <K1> KStream<K1, V> selectKey(KeyValueMapper<K, V, K1> mapper);
+
/**
* Create a new instance of {@link KStream} by transforming each element in this stream into a different element in the new stream.
*
http://git-wip-us.apache.org/repos/asf/kafka/blob/b1691cf4/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index bb6878f..1e44cb5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -203,6 +203,15 @@ public interface KTable<K, V> {
KStream<K, V> toStream();
/**
+ * Convert this stream to a new instance of {@link KStream} using the given {@link KeyValueMapper} to select
+ * the new key.
+ *
+ * @param mapper @param mapper the instance of {@link KeyValueMapper}
+ * @param <K1> the new key type
+ */
+ <K1> KStream<K1, V> toStream(KeyValueMapper<K, V, K1> mapper);
+
+ /**
* Combine values of this stream with another {@link KTable} stream's elements of the same key using Inner Join.
*
* @param other the instance of {@link KTable} joined with this stream
http://git-wip-us.apache.org/repos/asf/kafka/blob/b1691cf4/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 7030021..a84b4aa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -86,6 +86,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
private static final String REDUCE_NAME = "KSTREAM-REDUCE-";
+ private static final String KEY_SELECT_NAME = "KSTREAM-KEY-SELECT-";
+
public static final String SINK_NAME = "KSTREAM-SINK-";
public static final String SOURCE_NAME = "KSTREAM-SOURCE-";
@@ -121,6 +123,19 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
}
@Override
+ @SuppressWarnings("unchecked")
+ public <K1> KStream<K1, V> selectKey(final KeyValueMapper<K, V, K1> mapper) {
+ String name = topology.newName(KEY_SELECT_NAME);
+ topology.addProcessor(name, new KStreamMap<>(new KeyValueMapper<K, V, KeyValue<K1, V>>() {
+ @Override
+ public KeyValue<K1, V> apply(K key, V value) {
+ return new KeyValue(mapper.apply(key, value), value);
+ }
+ }), this.name);
+ return new KStreamImpl<>(topology, name, sourceNodes);
+ }
+
+ @Override
public <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, V1>> mapper) {
String name = topology.newName(MAP_NAME);
http://git-wip-us.apache.org/repos/asf/kafka/blob/b1691cf4/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index f78169e..5c291f5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -244,6 +244,11 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
return new KStreamImpl<>(topology, name, sourceNodes);
}
+ @Override
+ public <K1> KStream<K1, V> toStream(KeyValueMapper<K, V, K1> mapper) {
+ return toStream().selectKey(mapper);
+ }
+
@SuppressWarnings("unchecked")
@Override
public <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/b1691cf4/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
new file mode 100644
index 0000000..5f19b9e
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.kafka.streams.kstream.internals;
+
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class KStreamSelectKeyTest {
+
+ private String topicName = "topic_key_select";
+
+ final private Serde<Integer> integerSerde = Serdes.Integer();
+ final private Serde<String> stringSerde = Serdes.String();
+
+ @Test
+ public void testSelectKey() {
+ KStreamBuilder builder = new KStreamBuilder();
+
+ final Map<Integer, String> keyMap = new HashMap<>();
+ keyMap.put(1, "ONE");
+ keyMap.put(2, "TWO");
+ keyMap.put(3, "THREE");
+
+
+ KeyValueMapper<String, Integer, String> selector = new KeyValueMapper<String, Integer, String>() {
+ @Override
+ public String apply(String key, Integer value) {
+ return keyMap.get(value);
+ }
+ };
+
+ final String[] expected = new String[]{"ONE:1", "TWO:2", "THREE:3"};
+ final int[] expectedValues = new int[]{1, 2, 3};
+
+ KStream<String, Integer> stream = builder.stream(stringSerde, integerSerde, topicName);
+
+ MockProcessorSupplier<String, Integer> processor = new MockProcessorSupplier<>();
+
+ stream.selectKey(selector).process(processor);
+
+ KStreamTestDriver driver = new KStreamTestDriver(builder);
+
+ for (int expectedValue : expectedValues) {
+ driver.process(topicName, null, expectedValue);
+ }
+
+ assertEquals(3, processor.processed.size());
+
+ for (int i = 0; i < expected.length; i++) {
+ assertEquals(expected[i], processor.processed.get(i));
+ }
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/b1691cf4/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
new file mode 100644
index 0000000..ce1b9d6
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class KTableMapKeysTest {
+
+ final private Serde<String> stringSerde = new Serdes.StringSerde();
+ final private Serde<Integer> integerSerde = new Serdes.IntegerSerde();
+
+ @Test
+ public void testMapKeysConvertingToStream() {
+ final KStreamBuilder builder = new KStreamBuilder();
+
+ String topic1 = "topic_map_keys";
+
+ KTable<Integer, String> table1 = builder.table(integerSerde, stringSerde, topic1);
+
+ final Map<Integer, String> keyMap = new HashMap<>();
+ keyMap.put(1, "ONE");
+ keyMap.put(2, "TWO");
+ keyMap.put(3, "THREE");
+
+ KeyValueMapper<Integer, String, String> keyMapper = new KeyValueMapper<Integer, String, String>() {
+ @Override
+ public String apply(Integer key, String value) {
+ return keyMap.get(key);
+ }
+ };
+
+ KStream<String, String> convertedStream = table1.toStream(keyMapper);
+
+ final String[] expected = new String[]{"ONE:V_ONE", "TWO:V_TWO", "THREE:V_THREE"};
+ final int[] originalKeys = new int[]{1, 2, 3};
+ final String[] values = new String[]{"V_ONE", "V_TWO", "V_THREE"};
+
+
+
+ MockProcessorSupplier<String, String> processor = new MockProcessorSupplier<>();
+
+ convertedStream.process(processor);
+
+ KStreamTestDriver driver = new KStreamTestDriver(builder);
+
+ for (int i = 0; i < originalKeys.length; i++) {
+ driver.process(topic1, originalKeys[i], values[i]);
+ }
+
+ assertEquals(3, processor.processed.size());
+
+ for (int i = 0; i < expected.length; i++) {
+ assertEquals(expected[i], processor.processed.get(i));
+ }
+ }
+
+
+
+}
\ No newline at end of file