You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/04/17 05:18:30 UTC

kafka git commit: KAFKA-3430: Allow users to set key in KTable.toStream and in KStream.

Repository: kafka
Updated Branches:
  refs/heads/trunk 5236bf60d -> b1691cf49


KAFKA-3430: Allow users to set key in KTable.toStream and in KStream.

… With KStream the method selectKey was added to enable getting a key from values before perfoming aggregation-by-key operations on original streams that have null keys.

Author: bbejeck <bb...@gmail.com>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #1222 from bbejeck/KAFKA-3430_allow_users_to_set_key_KTable_toStream


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b1691cf4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b1691cf4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b1691cf4

Branch: refs/heads/trunk
Commit: b1691cf49e9de850ac8a2675c487af9fb60bfdaa
Parents: 5236bf6
Author: Bill Bejeck <bb...@gmail.com>
Authored: Sat Apr 16 20:18:27 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sat Apr 16 20:18:27 2016 -0700

----------------------------------------------------------------------
 .../apache/kafka/streams/kstream/KStream.java   |  9 ++
 .../apache/kafka/streams/kstream/KTable.java    |  9 ++
 .../streams/kstream/internals/KStreamImpl.java  | 15 ++++
 .../streams/kstream/internals/KTableImpl.java   |  5 ++
 .../kstream/internals/KStreamSelectKeyTest.java | 83 ++++++++++++++++++
 .../kstream/internals/KTableMapKeysTest.java    | 88 ++++++++++++++++++++
 6 files changed, 209 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b1691cf4/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 27475aa..7e3562c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -46,6 +46,15 @@ public interface KStream<K, V> {
      */
     KStream<K, V> filterNot(Predicate<K, V> predicate);
 
+
+    /**
+     * Create a new key from the current key and value.
+     *
+     * @param mapper  the instance of {@link KeyValueMapper}
+     * @param <K1>   the new key type on the stream
+     */
+    <K1> KStream<K1, V> selectKey(KeyValueMapper<K, V, K1> mapper);
+
     /**
      * Create a new instance of {@link KStream} by transforming each element in this stream into a different element in the new stream.
      *

http://git-wip-us.apache.org/repos/asf/kafka/blob/b1691cf4/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index bb6878f..1e44cb5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -203,6 +203,15 @@ public interface KTable<K, V> {
     KStream<K, V> toStream();
 
     /**
+     *  Convert this stream to a new instance of {@link KStream} using the given {@link KeyValueMapper} to select
+     *  the new key.
+     *
+     * @param mapper  @param mapper  the instance of {@link KeyValueMapper}
+     * @param <K1> the new key type
+     */
+    <K1> KStream<K1, V> toStream(KeyValueMapper<K, V, K1> mapper);
+
+    /**
      * Combine values of this stream with another {@link KTable} stream's elements of the same key using Inner Join.
      *
      * @param other         the instance of {@link KTable} joined with this stream

http://git-wip-us.apache.org/repos/asf/kafka/blob/b1691cf4/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 7030021..a84b4aa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -86,6 +86,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     private static final String REDUCE_NAME = "KSTREAM-REDUCE-";
 
+    private static final String KEY_SELECT_NAME = "KSTREAM-KEY-SELECT-";
+
     public static final String SINK_NAME = "KSTREAM-SINK-";
 
     public static final String SOURCE_NAME = "KSTREAM-SOURCE-";
@@ -121,6 +123,19 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     @Override
+    @SuppressWarnings("unchecked")
+    public <K1> KStream<K1, V> selectKey(final KeyValueMapper<K, V, K1> mapper) {
+        String name = topology.newName(KEY_SELECT_NAME);
+        topology.addProcessor(name, new KStreamMap<>(new KeyValueMapper<K, V, KeyValue<K1, V>>() {
+            @Override
+            public KeyValue<K1, V> apply(K key, V value) {
+                return new KeyValue(mapper.apply(key, value), value);
+            }
+        }), this.name);
+        return new KStreamImpl<>(topology, name, sourceNodes);
+    }
+
+    @Override
     public <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, V1>> mapper) {
         String name = topology.newName(MAP_NAME);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b1691cf4/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index f78169e..5c291f5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -244,6 +244,11 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return new KStreamImpl<>(topology, name, sourceNodes);
     }
 
+    @Override
+    public <K1> KStream<K1, V> toStream(KeyValueMapper<K, V, K1> mapper) {
+        return toStream().selectKey(mapper);
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     public <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b1691cf4/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
new file mode 100644
index 0000000..5f19b9e
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.kafka.streams.kstream.internals;
+
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class KStreamSelectKeyTest {
+
+    private String topicName = "topic_key_select";
+
+    final private Serde<Integer> integerSerde = Serdes.Integer();
+    final private Serde<String> stringSerde = Serdes.String();
+
+    @Test
+    public void testSelectKey() {
+        KStreamBuilder builder = new KStreamBuilder();
+
+        final Map<Integer, String> keyMap = new HashMap<>();
+        keyMap.put(1, "ONE");
+        keyMap.put(2, "TWO");
+        keyMap.put(3, "THREE");
+
+
+        KeyValueMapper<String, Integer, String> selector = new KeyValueMapper<String, Integer, String>() {
+            @Override
+            public String apply(String key, Integer value) {
+                return keyMap.get(value);
+            }
+        };
+
+        final String[] expected = new String[]{"ONE:1", "TWO:2", "THREE:3"};
+        final int[] expectedValues = new int[]{1, 2, 3};
+
+        KStream<String, Integer>  stream = builder.stream(stringSerde, integerSerde, topicName);
+
+        MockProcessorSupplier<String, Integer> processor = new MockProcessorSupplier<>();
+
+        stream.selectKey(selector).process(processor);
+
+        KStreamTestDriver driver = new KStreamTestDriver(builder);
+
+        for (int expectedValue : expectedValues) {
+            driver.process(topicName, null, expectedValue);
+        }
+
+        assertEquals(3, processor.processed.size());
+
+        for (int i = 0; i < expected.length; i++) {
+            assertEquals(expected[i], processor.processed.get(i));
+        }
+
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/b1691cf4/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
new file mode 100644
index 0000000..ce1b9d6
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class KTableMapKeysTest {
+
+    final private Serde<String> stringSerde = new Serdes.StringSerde();
+    final private Serde<Integer>  integerSerde = new Serdes.IntegerSerde();
+
+    @Test
+    public void testMapKeysConvertingToStream() {
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        String topic1 = "topic_map_keys";
+
+        KTable<Integer, String> table1 = builder.table(integerSerde, stringSerde, topic1);
+
+        final Map<Integer, String> keyMap = new HashMap<>();
+        keyMap.put(1, "ONE");
+        keyMap.put(2, "TWO");
+        keyMap.put(3, "THREE");
+
+        KeyValueMapper<Integer, String, String> keyMapper = new KeyValueMapper<Integer, String, String>() {
+            @Override
+            public  String apply(Integer key, String value) {
+                return keyMap.get(key);
+            }
+        };
+
+        KStream<String, String> convertedStream = table1.toStream(keyMapper);
+
+        final String[] expected = new String[]{"ONE:V_ONE", "TWO:V_TWO", "THREE:V_THREE"};
+        final int[] originalKeys = new int[]{1, 2, 3};
+        final String[] values = new String[]{"V_ONE", "V_TWO", "V_THREE"};
+
+
+
+        MockProcessorSupplier<String, String> processor = new MockProcessorSupplier<>();
+
+        convertedStream.process(processor);
+
+        KStreamTestDriver driver = new KStreamTestDriver(builder);
+
+        for (int i = 0;  i < originalKeys.length; i++) {
+            driver.process(topic1, originalKeys[i], values[i]);
+        }
+
+        assertEquals(3, processor.processed.size());
+
+        for (int i = 0; i < expected.length; i++) {
+            assertEquals(expected[i], processor.processed.get(i));
+        }
+    }
+
+
+
+}
\ No newline at end of file