You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nw...@apache.org on 2018/12/20 02:57:04 UTC
[incubator-heron] branch master updated: Add KVStreamlet and
keyBy() operation (#3125)
This is an automated email from the ASF dual-hosted git repository.
nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 103bfc1 Add KVStreamlet and keyBy() operation (#3125)
103bfc1 is described below
commit 103bfc1515b4380e94301480221e748ba27f879a
Author: Ning Wang <nw...@twitter.com>
AuthorDate: Wed Dec 19 18:57:00 2018 -0800
Add KVStreamlet and keyBy() operation (#3125)
* Add KVStreamlet and keyBy() operation
---
.../org/apache/heron/streamlet/KVStreamlet.java | 29 ++++
.../java/org/apache/heron/streamlet/Streamlet.java | 23 +++-
.../heron/streamlet/impl/KVStreamletImpl.java | 30 +++++
.../apache/heron/streamlet/impl/StreamletImpl.java | 38 +++++-
.../streamlet/impl/operators/KeyByOperator.java | 53 ++++++++
.../GeneralReduceByKeyAndWindowStreamlet.java | 4 +-
.../streamlet/impl/streamlets/JoinStreamlet.java | 4 +-
.../streamlet/impl/streamlets/KeyByStreamlet.java | 57 ++++++++
.../streamlets/ReduceByKeyAndWindowStreamlet.java | 4 +-
.../apache/heron/streamlet/scala/KVStreamlet.scala | 27 ++++
.../apache/heron/streamlet/scala/Streamlet.scala | 14 ++
.../streamlet/scala/impl/KVStreamletImpl.scala | 42 ++++++
.../heron/streamlet/scala/impl/StreamletImpl.scala | 102 +++++++++-----
heron/api/tests/java/BUILD | 1 +
.../heron/streamlet/impl/StreamletImplTest.java | 16 +++
.../impl/operators/KeyByOperatorTest.java | 148 +++++++++++++++++++++
.../streamlet/scala/impl/StreamletImplTest.scala | 28 ++++
17 files changed, 569 insertions(+), 51 deletions(-)
diff --git a/heron/api/src/java/org/apache/heron/streamlet/KVStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/KVStreamlet.java
new file mode 100644
index 0000000..ae51af3
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/KVStreamlet.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.streamlet;
+
+import org.apache.heron.classification.InterfaceStability;
+
+/**
+ * A KVStreamlet is a Streamlet with KeyValue data.
+ */
+@InterfaceStability.Evolving
+public interface KVStreamlet<K, V> extends Streamlet<KeyValue<K, V>> {
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
index e1f0616..5607e8a 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
@@ -145,7 +145,7 @@ public interface Streamlet<R> {
* have. Typical windowing strategies are sliding windows and tumbling windows
* @param joinFunction The join function that needs to be applied
*/
- <K, S, T> Streamlet<KeyValue<KeyedWindow<K>, T>>
+ <K, S, T> KVStreamlet<KeyedWindow<K>, T>
join(Streamlet<S> other, SerializableFunction<R, K> thisKeyExtractor,
SerializableFunction<S, K> otherKeyExtractor, WindowConfig windowCfg,
SerializableBiFunction<R, S, ? extends T> joinFunction);
@@ -166,7 +166,7 @@ public interface Streamlet<R> {
* @param joinType Type of Join. Options {@link JoinType}
* @param joinFunction The join function that needs to be applied
*/
- <K, S, T> Streamlet<KeyValue<KeyedWindow<K>, T>>
+ <K, S, T> KVStreamlet<KeyedWindow<K>, T>
join(Streamlet<S> other, SerializableFunction<R, K> thisKeyExtractor,
SerializableFunction<S, K> otherKeyExtractor, WindowConfig windowCfg,
JoinType joinType, SerializableBiFunction<R, S, ? extends T> joinFunction);
@@ -181,7 +181,7 @@ public interface Streamlet<R> {
* Typical windowing strategies are sliding windows and tumbling windows
* @param reduceFn The reduce function that you want to apply to all the values of a key.
*/
- <K, V> Streamlet<KeyValue<KeyedWindow<K>, V>> reduceByKeyAndWindow(
+ <K, V> KVStreamlet<KeyedWindow<K>, V> reduceByKeyAndWindow(
SerializableFunction<R, K> keyExtractor, SerializableFunction<R, V> valueExtractor,
WindowConfig windowCfg, SerializableBinaryOperator<V> reduceFn);
@@ -198,7 +198,7 @@ public interface Streamlet<R> {
* @param reduceFn The reduce function takes two parameters: a partial result of the reduction
* and the next element of the stream. It returns a new partial result.
*/
- <K, T> Streamlet<KeyValue<KeyedWindow<K>, T>> reduceByKeyAndWindow(
+ <K, T> KVStreamlet<KeyedWindow<K>, T> reduceByKeyAndWindow(
SerializableFunction<R, K> keyExtractor, WindowConfig windowCfg,
T identity, SerializableBiFunction<T, R, ? extends T> reduceFn);
@@ -244,6 +244,21 @@ public interface Streamlet<R> {
Streamlet<R> split(Map<String, SerializablePredicate<R>> splitFns);
/**
+ * Return a new KVStreamlet<K, R> by applying key extractor to each element of this Streamlet
+ * @param keyExtractor The function applied to a tuple of this streamlet to get the key
+ */
+ <K> KVStreamlet<K, R> keyBy(SerializableFunction<R, K> keyExtractor);
+
+ /**
+ * Return a new KVStreamlet<K, V> by applying key and value extractor to each element of this
+ * Streamlet
+ * @param keyExtractor The function applied to a tuple of this streamlet to get the key
+ * @param valueExtractor The function applied to a tuple of this streamlet to extract the value
+ */
+ <K, V> KVStreamlet<K, V> keyBy(SerializableFunction<R, K> keyExtractor,
+ SerializableFunction<R, V> valueExtractor);
+
+ /**
* Logs every element of the streamlet using String.valueOf function
* This is one of the sink functions in the sense that this operation returns void
*/
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/KVStreamletImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/KVStreamletImpl.java
new file mode 100644
index 0000000..e9590ac
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/KVStreamletImpl.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.streamlet.impl;
+
+import org.apache.heron.streamlet.KVStreamlet;
+import org.apache.heron.streamlet.KeyValue;
+
+/**
+ * A KVStreamlet is a Streamlet with KeyValue data.
+ */
+public abstract class KVStreamletImpl<K, V>
+ extends StreamletImpl<KeyValue<K, V>>
+ implements KVStreamlet<K, V> {
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
index 7ca0f43..9ce51b4 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
@@ -33,7 +33,7 @@ import org.apache.heron.api.topology.TopologyBuilder;
import org.apache.heron.api.utils.Utils;
import org.apache.heron.streamlet.IStreamletOperator;
import org.apache.heron.streamlet.JoinType;
-import org.apache.heron.streamlet.KeyValue;
+import org.apache.heron.streamlet.KVStreamlet;
import org.apache.heron.streamlet.KeyedWindow;
import org.apache.heron.streamlet.SerializableBiFunction;
import org.apache.heron.streamlet.SerializableBinaryOperator;
@@ -50,6 +50,7 @@ import org.apache.heron.streamlet.impl.streamlets.FilterStreamlet;
import org.apache.heron.streamlet.impl.streamlets.FlatMapStreamlet;
import org.apache.heron.streamlet.impl.streamlets.GeneralReduceByKeyAndWindowStreamlet;
import org.apache.heron.streamlet.impl.streamlets.JoinStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.KeyByStreamlet;
import org.apache.heron.streamlet.impl.streamlets.LogStreamlet;
import org.apache.heron.streamlet.impl.streamlets.MapStreamlet;
import org.apache.heron.streamlet.impl.streamlets.ReduceByKeyAndWindowStreamlet;
@@ -114,6 +115,7 @@ public abstract class StreamletImpl<R> implements Streamlet<R> {
FILTER("filter"),
FLATMAP("flatmap"),
JOIN("join"),
+ KEYBY("keyBy"),
LOGGER("logger"),
MAP("map"),
SOURCE("generator"),
@@ -394,7 +396,7 @@ public abstract class StreamletImpl<R> implements Streamlet<R> {
* @param joinFunction The join function that needs to be applied
*/
@Override
- public <K, S, T> Streamlet<KeyValue<KeyedWindow<K>, T>>
+ public <K, S, T> KVStreamlet<KeyedWindow<K>, T>
join(Streamlet<S> otherStreamlet, SerializableFunction<R, K> thisKeyExtractor,
SerializableFunction<S, K> otherKeyExtractor, WindowConfig windowCfg,
SerializableBiFunction<R, S, ? extends T> joinFunction) {
@@ -424,7 +426,7 @@ public abstract class StreamletImpl<R> implements Streamlet<R> {
* @param joinFunction The join function that needs to be applied
*/
@Override
- public <K, S, T> Streamlet<KeyValue<KeyedWindow<K>, T>>
+ public <K, S, T> KVStreamlet<KeyedWindow<K>, T>
join(Streamlet<S> otherStreamlet, SerializableFunction<R, K> thisKeyExtractor,
SerializableFunction<S, K> otherKeyExtractor, WindowConfig windowCfg,
JoinType joinType, SerializableBiFunction<R, S, ? extends T> joinFunction) {
@@ -454,7 +456,7 @@ public abstract class StreamletImpl<R> implements Streamlet<R> {
* @param reduceFn The reduce function that you want to apply to all the values of a key.
*/
@Override
- public <K, T> Streamlet<KeyValue<KeyedWindow<K>, T>> reduceByKeyAndWindow(
+ public <K, T> KVStreamlet<KeyedWindow<K>, T> reduceByKeyAndWindow(
SerializableFunction<R, K> keyExtractor, SerializableFunction<R, T> valueExtractor,
WindowConfig windowCfg, SerializableBinaryOperator<T> reduceFn) {
checkNotNull(keyExtractor, "keyExtractor cannot be null");
@@ -483,7 +485,7 @@ public abstract class StreamletImpl<R> implements Streamlet<R> {
* and the next element of the stream. It returns a new partial result.
*/
@Override
- public <K, T> Streamlet<KeyValue<KeyedWindow<K>, T>> reduceByKeyAndWindow(
+ public <K, T> KVStreamlet<KeyedWindow<K>, T> reduceByKeyAndWindow(
SerializableFunction<R, K> keyExtractor, WindowConfig windowCfg,
T identity, SerializableBiFunction<T, R, ? extends T> reduceFn) {
checkNotNull(keyExtractor, "keyExtractor cannot be null");
@@ -616,4 +618,30 @@ public abstract class StreamletImpl<R> implements Streamlet<R> {
addChild(splitStreamlet);
return splitStreamlet;
}
+
+ /**
+ * Return a new KVStreamlet<K, R> by applying key extractor to each element of this Streamlet
+ * @param keyExtractor The function applied to a tuple of this streamlet to get the key
+ */
+ @Override
+ public <K> KVStreamlet<K, R> keyBy(SerializableFunction<R, K> keyExtractor) {
+ return keyBy(keyExtractor, (a) -> a);
+ }
+
+ /**
+ * Return a new KVStreamlet<K, V> by applying key and value extractor to each element of this
+ * Streamlet
+ * @param keyExtractor The function applied to a tuple of this streamlet to get the key
+ * @param valueExtractor The function applied to a tuple of this streamlet to extract the value
+ */
+ public <K, V> KVStreamlet<K, V> keyBy(SerializableFunction<R, K> keyExtractor,
+ SerializableFunction<R, V> valueExtractor) {
+ checkNotNull(keyExtractor, "keyExtractor cannot be null");
+ checkNotNull(valueExtractor, "valueExtractor cannot be null");
+
+ KeyByStreamlet<R, K, V> retval =
+ new KeyByStreamlet<R, K, V>(this, keyExtractor, valueExtractor);
+ addChild(retval);
+ return retval;
+ }
}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/KeyByOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/KeyByOperator.java
new file mode 100644
index 0000000..911c081
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/KeyByOperator.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.streamlet.impl.operators;
+
+import org.apache.heron.api.tuple.Tuple;
+import org.apache.heron.api.tuple.Values;
+import org.apache.heron.streamlet.KeyValue;
+import org.apache.heron.streamlet.SerializableFunction;
+
+/**
+ * KeyByOperator is the class that implements keyBy functionality.
+ * It takes in a key extractor and a value extractor as input.
+ * For every tuple, the bolt convert the stream to a key-value pair tuple
+ * by applying the extractors.
+ */
+public class KeyByOperator<R, K, V> extends StreamletOperator<R, KeyValue<K, V>> {
+ private SerializableFunction<R, K> keyExtractor;
+ private SerializableFunction<R, V> valueExtractor;
+
+ public KeyByOperator(SerializableFunction<R, K> keyExtractor,
+ SerializableFunction<R, V> valueExtractor) {
+ this.keyExtractor = keyExtractor;
+ this.valueExtractor = valueExtractor;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void execute(Tuple tuple) {
+ R obj = (R) tuple.getValue(0);
+ K key = keyExtractor.apply(obj);
+ V value = valueExtractor.apply(obj);
+
+ collector.emit(new Values(new KeyValue<>(key, value)));
+ collector.ack(tuple);
+ }
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java
index f95d7d2..c2b8747 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java
@@ -23,11 +23,11 @@ package org.apache.heron.streamlet.impl.streamlets;
import java.util.Set;
import org.apache.heron.api.topology.TopologyBuilder;
-import org.apache.heron.streamlet.KeyValue;
import org.apache.heron.streamlet.KeyedWindow;
import org.apache.heron.streamlet.SerializableBiFunction;
import org.apache.heron.streamlet.SerializableFunction;
import org.apache.heron.streamlet.WindowConfig;
+import org.apache.heron.streamlet.impl.KVStreamletImpl;
import org.apache.heron.streamlet.impl.StreamletImpl;
import org.apache.heron.streamlet.impl.groupings.ReduceByKeyAndWindowCustomGrouping;
import org.apache.heron.streamlet.impl.operators.GeneralReduceByKeyAndWindowOperator;
@@ -40,7 +40,7 @@ import org.apache.heron.streamlet.impl.operators.GeneralReduceByKeyAndWindowOper
* KeyWindowInfo<K> type and the value is of type T.
*/
public class GeneralReduceByKeyAndWindowStreamlet<R, K, T>
- extends StreamletImpl<KeyValue<KeyedWindow<K>, T>> {
+ extends KVStreamletImpl<KeyedWindow<K>, T> {
private StreamletImpl<R> parent;
private SerializableFunction<R, K> keyExtractor;
private WindowConfig windowCfg;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java
index 40f76ef..ce03f97 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java
@@ -24,11 +24,11 @@ import java.util.Set;
import org.apache.heron.api.topology.TopologyBuilder;
import org.apache.heron.streamlet.JoinType;
-import org.apache.heron.streamlet.KeyValue;
import org.apache.heron.streamlet.KeyedWindow;
import org.apache.heron.streamlet.SerializableBiFunction;
import org.apache.heron.streamlet.SerializableFunction;
import org.apache.heron.streamlet.WindowConfig;
+import org.apache.heron.streamlet.impl.KVStreamletImpl;
import org.apache.heron.streamlet.impl.StreamletImpl;
import org.apache.heron.streamlet.impl.groupings.JoinCustomGrouping;
import org.apache.heron.streamlet.impl.operators.JoinOperator;
@@ -40,7 +40,7 @@ import org.apache.heron.streamlet.impl.operators.JoinOperator;
* JoinStreamlet's elements are of KeyValue type where the key is KeyWindowInfo<K> type
* and the value is of type VR.
*/
-public final class JoinStreamlet<K, R, S, T> extends StreamletImpl<KeyValue<KeyedWindow<K>, T>> {
+public final class JoinStreamlet<K, R, S, T> extends KVStreamletImpl<KeyedWindow<K>, T> {
private JoinType joinType;
private StreamletImpl<R> left;
private StreamletImpl<S> right;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/KeyByStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/KeyByStreamlet.java
new file mode 100644
index 0000000..df76eab
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/KeyByStreamlet.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.heron.streamlet.impl.streamlets;
+
+import java.util.Set;
+
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.SerializableFunction;
+import org.apache.heron.streamlet.impl.KVStreamletImpl;
+import org.apache.heron.streamlet.impl.StreamletImpl;
+import org.apache.heron.streamlet.impl.operators.KeyByOperator;
+
+/**
+ * KeyByStreamlet represents a KVStreamlet that is the result of applying key and value extractors
+ * on all elements.
+ */
+public class KeyByStreamlet<R, K, V> extends KVStreamletImpl<K, V> {
+ private StreamletImpl<R> parent;
+ private SerializableFunction<R, K> keyExtractor;
+ private SerializableFunction<R, V> valueExtractor;
+
+ public KeyByStreamlet(StreamletImpl<R> parent,
+ SerializableFunction<R, K> keyExtractor,
+ SerializableFunction<R, V> valueExtractor) {
+ this.parent = parent;
+ this.keyExtractor = keyExtractor;
+ this.valueExtractor = valueExtractor;
+ setNumPartitions(parent.getNumPartitions());
+ }
+
+ @Override
+ public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
+ setDefaultNameIfNone(StreamletNamePrefix.KEYBY, stageNames);
+ KeyByOperator<R, K, V> bolt = new KeyByOperator<>(keyExtractor, valueExtractor);
+ bldr.setBolt(getName(), bolt, getNumPartitions())
+ .shuffleGrouping(parent.getName(), parent.getStreamId());
+ return true;
+ }
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java
index 18fea4d..2c399f3 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java
@@ -23,11 +23,11 @@ package org.apache.heron.streamlet.impl.streamlets;
import java.util.Set;
import org.apache.heron.api.topology.TopologyBuilder;
-import org.apache.heron.streamlet.KeyValue;
import org.apache.heron.streamlet.KeyedWindow;
import org.apache.heron.streamlet.SerializableBinaryOperator;
import org.apache.heron.streamlet.SerializableFunction;
import org.apache.heron.streamlet.WindowConfig;
+import org.apache.heron.streamlet.impl.KVStreamletImpl;
import org.apache.heron.streamlet.impl.StreamletImpl;
import org.apache.heron.streamlet.impl.groupings.ReduceByKeyAndWindowCustomGrouping;
import org.apache.heron.streamlet.impl.operators.ReduceByKeyAndWindowOperator;
@@ -40,7 +40,7 @@ import org.apache.heron.streamlet.impl.operators.ReduceByKeyAndWindowOperator;
* KeyWindowInfo<K> type and the value is of type T.
*/
public class ReduceByKeyAndWindowStreamlet<R, K, T>
- extends StreamletImpl<KeyValue<KeyedWindow<K>, T>> {
+ extends KVStreamletImpl<KeyedWindow<K>, T> {
private StreamletImpl<R> parent;
private SerializableFunction<R, K> keyExtractor;
private SerializableFunction<R, T> valueExtractor;
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/KVStreamlet.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/KVStreamlet.scala
new file mode 100644
index 0000000..5b37033
--- /dev/null
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/KVStreamlet.scala
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.streamlet.scala
+
+import org.apache.heron.streamlet.KeyValue
+
+/**
+ * A KVStreamlet is a Streamlet with KeyValue data.
+ */
+trait KVStreamlet[K, V] extends Streamlet[KeyValue[K, V]] {
+}
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
index 9945852..46a6c37 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
@@ -265,6 +265,20 @@ trait Streamlet[R] {
def split(splitFns: Map[String, R => Boolean]): Streamlet[R]
/**
+ * Return a new KVStreamlet<K, R> by applying key extractor to each element of this Streamlet
+ * @param keyExtractor The function applied to a tuple of this streamlet to get the key
+ */
+ def keyBy[K](keyExtractor: R => K): KVStreamlet[K, R]
+
+ /**
+ * Return a new KVStreamlet<K, V> by applying key and value extractor to each element of this
+ * Streamlet
+ * @param keyExtractor The function applied to a tuple of this streamlet to get the key
+ * @param valueExtractor The function applied to a tuple of this streamlet to extract the value
+ */
+ def keyBy[K, T](keyExtractor: R => K, valueExtractor: R => T): KVStreamlet[K, T]
+
+ /**
* Logs every element of the streamlet using String.valueOf function
* This is one of the sink functions in the sense that this operation returns void
*/
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/KVStreamletImpl.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/KVStreamletImpl.scala
new file mode 100644
index 0000000..111ba59
--- /dev/null
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/KVStreamletImpl.scala
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.streamlet.scala.impl
+
+import org.apache.heron.streamlet.{
+ KVStreamlet => JavaKVStreamlet,
+ Streamlet => JavaStreamlet
+}
+import org.apache.heron.streamlet.scala.KVStreamlet
+
+object KVStreamletImpl {
+ def fromJavaKVStreamlet[K, V](javaKVStreamlet: JavaKVStreamlet[K, V]): KVStreamlet[K, V] =
+ new KVStreamletImpl[K, V](javaKVStreamlet)
+
+ def toJavaKVStreamlet[K, V](streamlet: KVStreamlet[K, V]): JavaKVStreamlet[K, V] =
+ streamlet.asInstanceOf[KVStreamletImpl[K, V]].javaKVStreamlet
+}
+
+/**
+ * This class provides Scala Streamlet Implementation by wrapping Java Streamlet API.
+ * Passed User defined Scala Functions are transformed to related FunctionalInterface versions and
+ * related Java Streamlet is transformed to Scala version again.
+ */
+class KVStreamletImpl[K, V](val javaKVStreamlet: JavaKVStreamlet[K, V])
+ extends StreamletImpl(javaKVStreamlet) with KVStreamlet[K, V] {
+}
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
index fc308b5..84b04e2 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
@@ -27,18 +27,22 @@ import org.apache.heron.api.grouping.StreamGrouping
import org.apache.heron.streamlet.{
IStreamletOperator,
JoinType,
- KeyValue,
KeyedWindow,
SerializablePredicate,
+ KVStreamlet => JavaKVStreamlet,
Streamlet => JavaStreamlet,
WindowConfig
}
-import org.apache.heron.streamlet.impl.{StreamletImpl => JavaStreamletImpl}
+import org.apache.heron.streamlet.impl.{
+ KVStreamletImpl => JavaKVStreamletImpl,
+ StreamletImpl => JavaStreamletImpl
+}
import org.apache.heron.streamlet.impl.streamlets.SupplierStreamlet
import org.apache.heron.streamlet.scala.{
SerializableTransformer,
Sink,
+ KVStreamlet,
Streamlet
}
import org.apache.heron.streamlet.scala.converter.ScalaToJavaConverter._
@@ -204,18 +208,18 @@ class StreamletImpl[R](val javaStreamlet: JavaStreamlet[R])
thisKeyExtractor: R => K,
otherKeyExtractor: S => K,
windowCfg: WindowConfig,
- joinFunction: (R, S) => T): Streamlet[KeyValue[KeyedWindow[K], T]] = {
+ joinFunction: (R, S) => T): KVStreamlet[KeyedWindow[K], T] = {
val javaOtherStreamlet = toJavaStreamlet[S](other)
val javaThisKeyExtractor = toSerializableFunction[R, K](thisKeyExtractor)
val javaOtherKeyExtractor = toSerializableFunction[S, K](otherKeyExtractor)
val javaJoinFunction = toSerializableBiFunction[R, S, T](joinFunction)
- val newJavaStreamlet = javaStreamlet.join[K, S, T](javaOtherStreamlet,
- javaThisKeyExtractor,
- javaOtherKeyExtractor,
- windowCfg,
- javaJoinFunction)
- fromJavaStreamlet[KeyValue[KeyedWindow[K], T]](newJavaStreamlet)
+ val newJavaKVStreamlet = javaStreamlet.join[K, S, T](javaOtherStreamlet,
+ javaThisKeyExtractor,
+ javaOtherKeyExtractor,
+ windowCfg,
+ javaJoinFunction)
+ KVStreamletImpl.fromJavaKVStreamlet[KeyedWindow[K], T](newJavaKVStreamlet)
}
/**
@@ -240,19 +244,19 @@ class StreamletImpl[R](val javaStreamlet: JavaStreamlet[R])
otherKeyExtractor: S => K,
windowCfg: WindowConfig,
joinType: JoinType,
- joinFunction: (R, S) => T): Streamlet[KeyValue[KeyedWindow[K], T]] = {
+ joinFunction: (R, S) => T): KVStreamlet[KeyedWindow[K], T] = {
val javaOtherStreamlet = toJavaStreamlet[S](other)
val javaThisKeyExtractor = toSerializableFunction[R, K](thisKeyExtractor)
val javaOtherKeyExtractor = toSerializableFunction[S, K](otherKeyExtractor)
val javaJoinFunction = toSerializableBiFunction[R, S, T](joinFunction)
- val newJavaStreamlet = javaStreamlet.join[K, S, T](javaOtherStreamlet,
- javaThisKeyExtractor,
- javaOtherKeyExtractor,
- windowCfg,
- joinType,
- javaJoinFunction)
- fromJavaStreamlet[KeyValue[KeyedWindow[K], T]](newJavaStreamlet)
+ val newJavaKVStreamlet = javaStreamlet.join[K, S, T](javaOtherStreamlet,
+ javaThisKeyExtractor,
+ javaOtherKeyExtractor,
+ windowCfg,
+ joinType,
+ javaJoinFunction)
+ KVStreamletImpl.fromJavaKVStreamlet[KeyedWindow[K], T](newJavaKVStreamlet)
}
/**
@@ -266,21 +270,21 @@ class StreamletImpl[R](val javaStreamlet: JavaStreamlet[R])
* Typical windowing strategies are sliding windows and tumbling windows
* @param reduceFn The reduce function that you want to apply to all the values of a key.
*/
- override def reduceByKeyAndWindow[K, V](
+ override def reduceByKeyAndWindow[K, T](
keyExtractor: R => K,
- valueExtractor: R => V,
+ valueExtractor: R => T,
windowCfg: WindowConfig,
- reduceFn: (V, V) => V): Streamlet[KeyValue[KeyedWindow[K], V]] = {
+ reduceFn: (T, T) => T): KVStreamlet[KeyedWindow[K], T] = {
val javaKeyExtractor = toSerializableFunction[R, K](keyExtractor)
- val javaValueExtractor = toSerializableFunction[R, V](valueExtractor)
- val javaReduceFunction = toSerializableBinaryOperator[V](reduceFn)
-
- val newJavaStreamlet = javaStreamlet.reduceByKeyAndWindow[K, V](
- javaKeyExtractor,
- javaValueExtractor,
- windowCfg,
- javaReduceFunction)
- fromJavaStreamlet[KeyValue[KeyedWindow[K], V]](newJavaStreamlet)
+ val javaValueExtractor = toSerializableFunction[R, T](valueExtractor)
+ val javaReduceFunction = toSerializableBinaryOperator[T](reduceFn)
+
+ val newJavaKVStreamlet = javaStreamlet.reduceByKeyAndWindow[K, T](
+ javaKeyExtractor,
+ javaValueExtractor,
+ windowCfg,
+ javaReduceFunction)
+ KVStreamletImpl.fromJavaKVStreamlet[KeyedWindow[K], T](newJavaKVStreamlet)
}
/**
@@ -301,16 +305,16 @@ class StreamletImpl[R](val javaStreamlet: JavaStreamlet[R])
keyExtractor: R => K,
windowCfg: WindowConfig,
identity: T,
- reduceFn: (T, R) => T): Streamlet[KeyValue[KeyedWindow[K], T]] = {
+ reduceFn: (T, R) => T): KVStreamlet[KeyedWindow[K], T] = {
val javaKeyExtractor = toSerializableFunction[R, K](keyExtractor)
val javaReduceFunction = toSerializableBiFunction[T, R, T](reduceFn)
- val newJavaStreamlet = javaStreamlet.reduceByKeyAndWindow[K, T](
- javaKeyExtractor,
- windowCfg,
- identity,
- javaReduceFunction)
- fromJavaStreamlet[KeyValue[KeyedWindow[K], T]](newJavaStreamlet)
+ val newJavaKVStreamlet = javaStreamlet.reduceByKeyAndWindow[K, T](
+ javaKeyExtractor,
+ windowCfg,
+ identity,
+ javaReduceFunction)
+ KVStreamletImpl.fromJavaKVStreamlet[KeyedWindow[K], T](newJavaKVStreamlet)
}
/**
@@ -379,6 +383,32 @@ class StreamletImpl[R](val javaStreamlet: JavaStreamlet[R])
fromJavaStreamlet[R](newJavaStreamlet)
}
+
+ /**
+ * Return a new KVStreamlet<K, R> by applying key extractor to each element of this Streamlet
+ * @param keyExtractor The function applied to a tuple of this streamlet to get the key
+ */
+ override def keyBy[K](keyExtractor: R => K): KVStreamlet[K, R] = {
+ val javaKeyExtractor = toSerializableFunction[R, K](keyExtractor)
+
+ val newJavaKVStreamlet = javaStreamlet.keyBy[K](javaKeyExtractor)
+ KVStreamletImpl.fromJavaKVStreamlet[K, R](newJavaKVStreamlet)
+ }
+
+ /**
+ * Return a new KVStreamlet<K, V> by applying key and value extractor to each element of this
+ * Streamlet
+ * @param keyExtractor The function applied to a tuple of this streamlet to get the key
+ * @param valueExtractor The function applied to a tuple of this streamlet to extract the value
+ */
+ override def keyBy[K, T](keyExtractor: R => K, valueExtractor: R => T): KVStreamlet[K, T] = {
+ val javaKeyExtractor = toSerializableFunction[R, K](keyExtractor)
+ val javaValueExtractor = toSerializableFunction[R, T](valueExtractor)
+
+ val newJavaKVStreamlet = javaStreamlet.keyBy[K, T](javaKeyExtractor, javaValueExtractor)
+ KVStreamletImpl.fromJavaKVStreamlet[K, T](newJavaKVStreamlet)
+ }
+
/**
* Logs every element of the streamlet using String.valueOf function
* This is one of the sink functions in the sense that this operation returns void
diff --git a/heron/api/tests/java/BUILD b/heron/api/tests/java/BUILD
index 03c1f2b..d8efbdf 100644
--- a/heron/api/tests/java/BUILD
+++ b/heron/api/tests/java/BUILD
@@ -29,6 +29,7 @@ java_tests(
"org.apache.heron.api.bolt.BaseWindowedBoltTest",
"org.apache.heron.streamlet.impl.StreamletImplTest",
"org.apache.heron.streamlet.impl.operators.JoinOperatorTest",
+ "org.apache.heron.streamlet.impl.operators.KeyByOperatorTest",
"org.apache.heron.streamlet.impl.operators.ReduceByKeyAndWindowOperatorTest",
"org.apache.heron.streamlet.impl.operators.GeneralReduceByKeyAndWindowOperatorTest",
"org.apache.heron.streamlet.impl.utils.StreamletUtilsTest",
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
index 5d9053a..d249664 100644
--- a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
+++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
@@ -44,6 +44,7 @@ import org.apache.heron.streamlet.Context;
import org.apache.heron.streamlet.IStreamletBasicOperator;
import org.apache.heron.streamlet.IStreamletRichOperator;
import org.apache.heron.streamlet.IStreamletWindowOperator;
+import org.apache.heron.streamlet.KVStreamlet;
import org.apache.heron.streamlet.SerializableConsumer;
import org.apache.heron.streamlet.SerializablePredicate;
import org.apache.heron.streamlet.SerializableTransformer;
@@ -55,6 +56,7 @@ import org.apache.heron.streamlet.impl.streamlets.CustomStreamlet;
import org.apache.heron.streamlet.impl.streamlets.FilterStreamlet;
import org.apache.heron.streamlet.impl.streamlets.FlatMapStreamlet;
import org.apache.heron.streamlet.impl.streamlets.JoinStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.KeyByStreamlet;
import org.apache.heron.streamlet.impl.streamlets.MapStreamlet;
import org.apache.heron.streamlet.impl.streamlets.ReduceByKeyAndWindowStreamlet;
import org.apache.heron.streamlet.impl.streamlets.SourceStreamlet;
@@ -353,6 +355,20 @@ public class StreamletImplTest {
}
@Test
+ public void testKeyByStreamlet() {
+ Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
+ KVStreamlet<Long, Double> kvStream = baseStreamlet.keyBy(x -> Math.round(x));
+
+ assertTrue(kvStream instanceof KeyByStreamlet);
+ KeyByStreamlet<Double, Long, Double> mStreamlet =
+ (KeyByStreamlet<Double, Long, Double>) kvStream;
+ assertEquals(1, mStreamlet.getNumPartitions());
+ SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
+ assertEquals(supplierStreamlet.getChildren().size(), 1);
+ assertEquals(supplierStreamlet.getChildren().get(0), kvStream);
+ }
+
+ @Test
@SuppressWarnings("unchecked")
public void testSimpleBuild() throws Exception {
Streamlet<String> baseStreamlet = builder.newSource(() -> "sa re ga ma");
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/operators/KeyByOperatorTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/operators/KeyByOperatorTest.java
new file mode 100644
index 0000000..4f88d05
--- /dev/null
+++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/operators/KeyByOperatorTest.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.streamlet.impl.operators;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.powermock.api.mockito.PowerMockito;
+
+import org.apache.heron.api.Config;
+import org.apache.heron.api.bolt.IOutputCollector;
+import org.apache.heron.api.bolt.OutputCollector;
+import org.apache.heron.api.generated.TopologyAPI;
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.api.tuple.Tuple;
+import org.apache.heron.api.tuple.Values;
+import org.apache.heron.common.utils.topology.TopologyContextImpl;
+import org.apache.heron.common.utils.tuple.TupleImpl;
+import org.apache.heron.streamlet.KeyValue;
+
+public class KeyByOperatorTest {
+
+ private List<Object> emittedTuples;
+
+ @Before
+ public void setUp() {
+ emittedTuples = new LinkedList<>();
+ }
+
+ @Test
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public void testKeyByOperator() {
+ KeyByOperator<String, String, Integer> keyByOperator = getKeyByOperator();
+
+ HashMap<String, Integer> expectedResults = new HashMap<>();
+ expectedResults.put("even", 0);
+ expectedResults.put("odd", 1);
+ expectedResults.put("even", 2);
+
+ TopologyAPI.StreamId componentStreamId
+ = TopologyAPI.StreamId.newBuilder()
+ .setComponentName("sourceComponent").setId("default").build();
+
+ keyByOperator.execute(getTuple(componentStreamId, new Fields("a"), new Values("0")));
+ keyByOperator.execute(getTuple(componentStreamId, new Fields("a"), new Values("1")));
+ keyByOperator.execute(getTuple(componentStreamId, new Fields("a"), new Values("2")));
+
+ Assert.assertEquals(3, emittedTuples.size());
+ String[] s = {"even", "odd"};
+ for (Object object : emittedTuples) {
+ KeyValue<String, Integer> tuple = (KeyValue<String, Integer>) object;
+ Assert.assertEquals(s[tuple.getValue() % 2], tuple.getKey());
+ }
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private KeyByOperator<String, String, Integer> getKeyByOperator() {
+ KeyByOperator<String, String, Integer> keyByOperator =
+ new KeyByOperator<String, String, Integer>(
+ x -> (Integer.valueOf(x) % 2 == 0) ? "even" : "odd",
+ x -> Integer.valueOf(x));
+
+ keyByOperator.prepare(new Config(), PowerMockito.mock(TopologyContext.class),
+ new OutputCollector(new IOutputCollector() {
+
+ @Override
+ public void reportError(Throwable error) {
+ }
+
+ @Override
+ public List<Integer> emit(String streamId,
+ Collection<Tuple> anchors, List<Object> tuple) {
+ emittedTuples.addAll(tuple);
+ return null;
+ }
+
+ @Override
+ public void emitDirect(int taskId, String streamId,
+ Collection<Tuple> anchors, List<Object> tuple) {
+ }
+
+ @Override
+ public void ack(Tuple input) {
+ }
+
+ @Override
+ public void fail(Tuple input) {
+ }
+ }));
+ return keyByOperator;
+ }
+
+ private Tuple getTuple(TopologyAPI.StreamId streamId, final Fields fields, Values values) {
+
+ TopologyContext topologyContext = getContext(fields);
+ return new TupleImpl(topologyContext, streamId, 0,
+ null, values, 1) {
+ @Override
+ public TopologyAPI.StreamId getSourceGlobalStreamId() {
+ return TopologyAPI.StreamId.newBuilder().setComponentName("sourceComponent")
+ .setId("default").build();
+ }
+ };
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private TopologyContext getContext(final Fields fields) {
+ TopologyBuilder builder = new TopologyBuilder();
+ return new TopologyContextImpl(new Config(),
+ builder.createTopology()
+ .setConfig(new Config())
+ .setName("test")
+ .setState(TopologyAPI.TopologyState.RUNNING)
+ .getTopology(),
+ new HashMap(), 1, null) {
+ @Override
+ public Fields getComponentOutputFields(
+ String componentId, String streamId) {
+ return fields;
+ }
+
+ };
+ }
+}
diff --git a/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala b/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala
index 5664d64..76de00c 100644
--- a/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala
+++ b/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala
@@ -40,6 +40,7 @@ import org.apache.heron.streamlet.impl.streamlets.{
FlatMapStreamlet,
LogStreamlet,
JoinStreamlet,
+ KeyByStreamlet,
MapStreamlet,
ReduceByKeyAndWindowStreamlet,
RemapStreamlet,
@@ -638,6 +639,33 @@ class StreamletImplTest extends BaseFunSuite {
assertEquals(0, mapStreamlet.getChildren.size())
}
+ test("StreamletImpl should support keyBy operation") {
+ val supplierStreamlet = builder
+ .newSource(() => Random.nextInt(10))
+ .setName("Supplier_Streamlet_1")
+ .setNumPartitions(3)
+
+ supplierStreamlet
+ .keyBy[Int, Int]((key: Int) => key % 3, // Put into 3 groups
+ (value: Int) => value)
+ .setName("KeyBy_Streamlet_1")
+ .setNumPartitions(5)
+
+ val supplierStreamletImpl =
+ supplierStreamlet.asInstanceOf[StreamletImpl[Int]]
+ assertEquals(1, supplierStreamletImpl.getChildren.size)
+ assertTrue(
+ supplierStreamletImpl
+ .getChildren(0)
+ .isInstanceOf[KeyByStreamlet[_, _, _]])
+ val keyByStreamlet = supplierStreamletImpl
+ .getChildren(0)
+ .asInstanceOf[KeyByStreamlet[Int, Int, Int]]
+ assertEquals("KeyBy_Streamlet_1", keyByStreamlet.getName)
+ assertEquals(5, keyByStreamlet.getNumPartitions)
+ assertEquals(0, keyByStreamlet.getChildren.size())
+ }
+
private def verifyClonedStreamlets[R](supplierStreamlet: Streamlet[R],
numClones: Int): Unit = {
val supplierStreamletImpl =