You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nw...@apache.org on 2018/12/20 02:57:04 UTC

[incubator-heron] branch master updated: Add KVStreamlet and keyBy() operation (#3125)

This is an automated email from the ASF dual-hosted git repository.

nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 103bfc1  Add KVStreamlet and keyBy() operation (#3125)
103bfc1 is described below

commit 103bfc1515b4380e94301480221e748ba27f879a
Author: Ning Wang <nw...@twitter.com>
AuthorDate: Wed Dec 19 18:57:00 2018 -0800

    Add KVStreamlet and keyBy() operation (#3125)
    
    * Add KVStreamlet and keyBy() operation
---
 .../org/apache/heron/streamlet/KVStreamlet.java    |  29 ++++
 .../java/org/apache/heron/streamlet/Streamlet.java |  23 +++-
 .../heron/streamlet/impl/KVStreamletImpl.java      |  30 +++++
 .../apache/heron/streamlet/impl/StreamletImpl.java |  38 +++++-
 .../streamlet/impl/operators/KeyByOperator.java    |  53 ++++++++
 .../GeneralReduceByKeyAndWindowStreamlet.java      |   4 +-
 .../streamlet/impl/streamlets/JoinStreamlet.java   |   4 +-
 .../streamlet/impl/streamlets/KeyByStreamlet.java  |  57 ++++++++
 .../streamlets/ReduceByKeyAndWindowStreamlet.java  |   4 +-
 .../apache/heron/streamlet/scala/KVStreamlet.scala |  27 ++++
 .../apache/heron/streamlet/scala/Streamlet.scala   |  14 ++
 .../streamlet/scala/impl/KVStreamletImpl.scala     |  42 ++++++
 .../heron/streamlet/scala/impl/StreamletImpl.scala | 102 +++++++++-----
 heron/api/tests/java/BUILD                         |   1 +
 .../heron/streamlet/impl/StreamletImplTest.java    |  16 +++
 .../impl/operators/KeyByOperatorTest.java          | 148 +++++++++++++++++++++
 .../streamlet/scala/impl/StreamletImplTest.scala   |  28 ++++
 17 files changed, 569 insertions(+), 51 deletions(-)

diff --git a/heron/api/src/java/org/apache/heron/streamlet/KVStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/KVStreamlet.java
new file mode 100644
index 0000000..ae51af3
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/KVStreamlet.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.streamlet;
+
+import org.apache.heron.classification.InterfaceStability;
+
+/**
+ * A KVStreamlet is a Streamlet with KeyValue data.
+ */
+@InterfaceStability.Evolving
+public interface KVStreamlet<K, V> extends Streamlet<KeyValue<K, V>> {
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
index e1f0616..5607e8a 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
@@ -145,7 +145,7 @@ public interface Streamlet<R> {
    * have. Typical windowing strategies are sliding windows and tumbling windows
    * @param joinFunction The join function that needs to be applied
    */
-  <K, S, T> Streamlet<KeyValue<KeyedWindow<K>, T>>
+  <K, S, T> KVStreamlet<KeyedWindow<K>, T>
         join(Streamlet<S> other, SerializableFunction<R, K> thisKeyExtractor,
              SerializableFunction<S, K> otherKeyExtractor, WindowConfig windowCfg,
              SerializableBiFunction<R, S, ? extends T> joinFunction);
@@ -166,7 +166,7 @@ public interface Streamlet<R> {
    * @param joinType Type of Join. Options {@link JoinType}
    * @param joinFunction The join function that needs to be applied
    */
-  <K, S, T> Streamlet<KeyValue<KeyedWindow<K>, T>>
+  <K, S, T> KVStreamlet<KeyedWindow<K>, T>
         join(Streamlet<S> other, SerializableFunction<R, K> thisKeyExtractor,
              SerializableFunction<S, K> otherKeyExtractor, WindowConfig windowCfg,
              JoinType joinType, SerializableBiFunction<R, S, ? extends T> joinFunction);
@@ -181,7 +181,7 @@ public interface Streamlet<R> {
    * Typical windowing strategies are sliding windows and tumbling windows
    * @param reduceFn The reduce function that you want to apply to all the values of a key.
    */
-  <K, V> Streamlet<KeyValue<KeyedWindow<K>, V>> reduceByKeyAndWindow(
+  <K, V> KVStreamlet<KeyedWindow<K>, V> reduceByKeyAndWindow(
       SerializableFunction<R, K> keyExtractor, SerializableFunction<R, V> valueExtractor,
       WindowConfig windowCfg, SerializableBinaryOperator<V> reduceFn);
 
@@ -198,7 +198,7 @@ public interface Streamlet<R> {
    * @param reduceFn The reduce function takes two parameters: a partial result of the reduction
    * and the next element of the stream. It returns a new partial result.
    */
-  <K, T> Streamlet<KeyValue<KeyedWindow<K>, T>> reduceByKeyAndWindow(
+  <K, T> KVStreamlet<KeyedWindow<K>, T> reduceByKeyAndWindow(
       SerializableFunction<R, K> keyExtractor, WindowConfig windowCfg,
       T identity, SerializableBiFunction<T, R, ? extends T> reduceFn);
 
@@ -244,6 +244,21 @@ public interface Streamlet<R> {
   Streamlet<R> split(Map<String, SerializablePredicate<R>> splitFns);
 
   /**
+   * Return a new KVStreamlet<K, R> by applying key extractor to each element of this Streamlet
+   * @param keyExtractor The function applied to a tuple of this streamlet to get the key
+   */
+  <K> KVStreamlet<K, R> keyBy(SerializableFunction<R, K> keyExtractor);
+
+  /**
+   * Return a new KVStreamlet<K, V> by applying key and value extractor to each element of this
+   * Streamlet
+   * @param keyExtractor The function applied to a tuple of this streamlet to get the key
+   * @param valueExtractor The function applied to a tuple of this streamlet to extract the value
+   */
+  <K, V> KVStreamlet<K, V> keyBy(SerializableFunction<R, K> keyExtractor,
+                                 SerializableFunction<R, V> valueExtractor);
+
+  /**
    * Logs every element of the streamlet using String.valueOf function
    * This is one of the sink functions in the sense that this operation returns void
    */
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/KVStreamletImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/KVStreamletImpl.java
new file mode 100644
index 0000000..e9590ac
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/KVStreamletImpl.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.streamlet.impl;
+
+import org.apache.heron.streamlet.KVStreamlet;
+import org.apache.heron.streamlet.KeyValue;
+
+/**
+ * A KVStreamlet is a Streamlet with KeyValue data.
+ */
+public abstract class KVStreamletImpl<K, V>
+    extends StreamletImpl<KeyValue<K, V>>
+    implements KVStreamlet<K, V> {
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
index 7ca0f43..9ce51b4 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
@@ -33,7 +33,7 @@ import org.apache.heron.api.topology.TopologyBuilder;
 import org.apache.heron.api.utils.Utils;
 import org.apache.heron.streamlet.IStreamletOperator;
 import org.apache.heron.streamlet.JoinType;
-import org.apache.heron.streamlet.KeyValue;
+import org.apache.heron.streamlet.KVStreamlet;
 import org.apache.heron.streamlet.KeyedWindow;
 import org.apache.heron.streamlet.SerializableBiFunction;
 import org.apache.heron.streamlet.SerializableBinaryOperator;
@@ -50,6 +50,7 @@ import org.apache.heron.streamlet.impl.streamlets.FilterStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.FlatMapStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.GeneralReduceByKeyAndWindowStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.JoinStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.KeyByStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.LogStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.MapStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.ReduceByKeyAndWindowStreamlet;
@@ -114,6 +115,7 @@ public abstract class StreamletImpl<R> implements Streamlet<R> {
     FILTER("filter"),
     FLATMAP("flatmap"),
     JOIN("join"),
+    KEYBY("keyBy"),
     LOGGER("logger"),
     MAP("map"),
     SOURCE("generator"),
@@ -394,7 +396,7 @@ public abstract class StreamletImpl<R> implements Streamlet<R> {
    * @param joinFunction The join function that needs to be applied
    */
   @Override
-  public <K, S, T> Streamlet<KeyValue<KeyedWindow<K>, T>>
+  public <K, S, T> KVStreamlet<KeyedWindow<K>, T>
         join(Streamlet<S> otherStreamlet, SerializableFunction<R, K> thisKeyExtractor,
              SerializableFunction<S, K> otherKeyExtractor, WindowConfig windowCfg,
              SerializableBiFunction<R, S, ? extends T> joinFunction) {
@@ -424,7 +426,7 @@ public abstract class StreamletImpl<R> implements Streamlet<R> {
    * @param joinFunction The join function that needs to be applied
    */
   @Override
-  public <K, S, T> Streamlet<KeyValue<KeyedWindow<K>, T>>
+  public <K, S, T> KVStreamlet<KeyedWindow<K>, T>
         join(Streamlet<S> otherStreamlet, SerializableFunction<R, K> thisKeyExtractor,
              SerializableFunction<S, K> otherKeyExtractor, WindowConfig windowCfg,
              JoinType joinType, SerializableBiFunction<R, S, ? extends T> joinFunction) {
@@ -454,7 +456,7 @@ public abstract class StreamletImpl<R> implements Streamlet<R> {
    * @param reduceFn The reduce function that you want to apply to all the values of a key.
    */
   @Override
-  public <K, T> Streamlet<KeyValue<KeyedWindow<K>, T>> reduceByKeyAndWindow(
+  public <K, T> KVStreamlet<KeyedWindow<K>, T> reduceByKeyAndWindow(
       SerializableFunction<R, K> keyExtractor, SerializableFunction<R, T> valueExtractor,
       WindowConfig windowCfg, SerializableBinaryOperator<T> reduceFn) {
     checkNotNull(keyExtractor, "keyExtractor cannot be null");
@@ -483,7 +485,7 @@ public abstract class StreamletImpl<R> implements Streamlet<R> {
    * and the next element of the stream. It returns a new partial result.
    */
   @Override
-  public <K, T> Streamlet<KeyValue<KeyedWindow<K>, T>> reduceByKeyAndWindow(
+  public <K, T> KVStreamlet<KeyedWindow<K>, T> reduceByKeyAndWindow(
       SerializableFunction<R, K> keyExtractor, WindowConfig windowCfg,
       T identity, SerializableBiFunction<T, R, ? extends T> reduceFn) {
     checkNotNull(keyExtractor, "keyExtractor cannot be null");
@@ -616,4 +618,30 @@ public abstract class StreamletImpl<R> implements Streamlet<R> {
     addChild(splitStreamlet);
     return splitStreamlet;
   }
+
+  /**
+   * Return a new KVStreamlet<K, R> by applying key extractor to each element of this Streamlet
+   * @param keyExtractor The function applied to a tuple of this streamlet to get the key
+   */
+  @Override
+  public <K> KVStreamlet<K, R> keyBy(SerializableFunction<R, K> keyExtractor) {
+    return keyBy(keyExtractor, (a) -> a);
+  }
+
+  /**
+   * Return a new KVStreamlet<K, V> by applying key and value extractor to each element of this
+   * Streamlet
+   * @param keyExtractor The function applied to a tuple of this streamlet to get the key
+   * @param valueExtractor The function applied to a tuple of this streamlet to extract the value
+   */
+  public <K, V> KVStreamlet<K, V> keyBy(SerializableFunction<R, K> keyExtractor,
+                                        SerializableFunction<R, V> valueExtractor) {
+    checkNotNull(keyExtractor, "keyExtractor cannot be null");
+    checkNotNull(valueExtractor, "valueExtractor cannot be null");
+
+    KeyByStreamlet<R, K, V> retval =
+        new KeyByStreamlet<R, K, V>(this, keyExtractor, valueExtractor);
+    addChild(retval);
+    return retval;
+  }
 }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/KeyByOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/KeyByOperator.java
new file mode 100644
index 0000000..911c081
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/KeyByOperator.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.streamlet.impl.operators;
+
+import org.apache.heron.api.tuple.Tuple;
+import org.apache.heron.api.tuple.Values;
+import org.apache.heron.streamlet.KeyValue;
+import org.apache.heron.streamlet.SerializableFunction;
+
+/**
+ * KeyByOperator is the class that implements keyBy functionality.
+ * It takes in a key extractor and a value extractor as input.
+ * For every tuple, the bolt convert the stream to a key-value pair tuple
+ * by applying the extractors.
+ */
+public class KeyByOperator<R, K, V> extends StreamletOperator<R, KeyValue<K, V>> {
+  private SerializableFunction<R, K> keyExtractor;
+  private SerializableFunction<R, V> valueExtractor;
+
+  public KeyByOperator(SerializableFunction<R, K> keyExtractor,
+                       SerializableFunction<R, V> valueExtractor) {
+    this.keyExtractor = keyExtractor;
+    this.valueExtractor = valueExtractor;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void execute(Tuple tuple) {
+    R obj = (R) tuple.getValue(0);
+    K key = keyExtractor.apply(obj);
+    V value = valueExtractor.apply(obj);
+
+    collector.emit(new Values(new KeyValue<>(key, value)));
+    collector.ack(tuple);
+  }
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java
index f95d7d2..c2b8747 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java
@@ -23,11 +23,11 @@ package org.apache.heron.streamlet.impl.streamlets;
 import java.util.Set;
 
 import org.apache.heron.api.topology.TopologyBuilder;
-import org.apache.heron.streamlet.KeyValue;
 import org.apache.heron.streamlet.KeyedWindow;
 import org.apache.heron.streamlet.SerializableBiFunction;
 import org.apache.heron.streamlet.SerializableFunction;
 import org.apache.heron.streamlet.WindowConfig;
+import org.apache.heron.streamlet.impl.KVStreamletImpl;
 import org.apache.heron.streamlet.impl.StreamletImpl;
 import org.apache.heron.streamlet.impl.groupings.ReduceByKeyAndWindowCustomGrouping;
 import org.apache.heron.streamlet.impl.operators.GeneralReduceByKeyAndWindowOperator;
@@ -40,7 +40,7 @@ import org.apache.heron.streamlet.impl.operators.GeneralReduceByKeyAndWindowOper
  * KeyWindowInfo&lt;K&gt; type and the value is of type T.
  */
 public class GeneralReduceByKeyAndWindowStreamlet<R, K, T>
-    extends StreamletImpl<KeyValue<KeyedWindow<K>, T>> {
+    extends KVStreamletImpl<KeyedWindow<K>, T> {
   private StreamletImpl<R> parent;
   private SerializableFunction<R, K> keyExtractor;
   private WindowConfig windowCfg;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java
index 40f76ef..ce03f97 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java
@@ -24,11 +24,11 @@ import java.util.Set;
 
 import org.apache.heron.api.topology.TopologyBuilder;
 import org.apache.heron.streamlet.JoinType;
-import org.apache.heron.streamlet.KeyValue;
 import org.apache.heron.streamlet.KeyedWindow;
 import org.apache.heron.streamlet.SerializableBiFunction;
 import org.apache.heron.streamlet.SerializableFunction;
 import org.apache.heron.streamlet.WindowConfig;
+import org.apache.heron.streamlet.impl.KVStreamletImpl;
 import org.apache.heron.streamlet.impl.StreamletImpl;
 import org.apache.heron.streamlet.impl.groupings.JoinCustomGrouping;
 import org.apache.heron.streamlet.impl.operators.JoinOperator;
@@ -40,7 +40,7 @@ import org.apache.heron.streamlet.impl.operators.JoinOperator;
  * JoinStreamlet's elements are of KeyValue type where the key is KeyWindowInfo&lt;K&gt; type
  * and the value is of type VR.
  */
-public final class JoinStreamlet<K, R, S, T> extends StreamletImpl<KeyValue<KeyedWindow<K>, T>> {
+public final class JoinStreamlet<K, R, S, T> extends KVStreamletImpl<KeyedWindow<K>, T> {
   private JoinType joinType;
   private StreamletImpl<R> left;
   private StreamletImpl<S> right;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/KeyByStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/KeyByStreamlet.java
new file mode 100644
index 0000000..df76eab
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/KeyByStreamlet.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.heron.streamlet.impl.streamlets;
+
+import java.util.Set;
+
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.SerializableFunction;
+import org.apache.heron.streamlet.impl.KVStreamletImpl;
+import org.apache.heron.streamlet.impl.StreamletImpl;
+import org.apache.heron.streamlet.impl.operators.KeyByOperator;
+
+/**
+ * KeyByStreamlet represents a KVStreamlet that is the result of applying key and value extractors
+ * on all elements.
+ */
+public class KeyByStreamlet<R, K, V> extends KVStreamletImpl<K, V> {
+  private StreamletImpl<R> parent;
+  private SerializableFunction<R, K> keyExtractor;
+  private SerializableFunction<R, V> valueExtractor;
+
+  public KeyByStreamlet(StreamletImpl<R> parent,
+                        SerializableFunction<R, K> keyExtractor,
+                        SerializableFunction<R, V> valueExtractor) {
+    this.parent = parent;
+    this.keyExtractor = keyExtractor;
+    this.valueExtractor = valueExtractor;
+    setNumPartitions(parent.getNumPartitions());
+  }
+
+  @Override
+  public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
+    setDefaultNameIfNone(StreamletNamePrefix.KEYBY, stageNames);
+    KeyByOperator<R, K, V> bolt = new KeyByOperator<>(keyExtractor, valueExtractor);
+    bldr.setBolt(getName(), bolt, getNumPartitions())
+        .shuffleGrouping(parent.getName(), parent.getStreamId());
+    return true;
+  }
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java
index 18fea4d..2c399f3 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java
@@ -23,11 +23,11 @@ package org.apache.heron.streamlet.impl.streamlets;
 import java.util.Set;
 
 import org.apache.heron.api.topology.TopologyBuilder;
-import org.apache.heron.streamlet.KeyValue;
 import org.apache.heron.streamlet.KeyedWindow;
 import org.apache.heron.streamlet.SerializableBinaryOperator;
 import org.apache.heron.streamlet.SerializableFunction;
 import org.apache.heron.streamlet.WindowConfig;
+import org.apache.heron.streamlet.impl.KVStreamletImpl;
 import org.apache.heron.streamlet.impl.StreamletImpl;
 import org.apache.heron.streamlet.impl.groupings.ReduceByKeyAndWindowCustomGrouping;
 import org.apache.heron.streamlet.impl.operators.ReduceByKeyAndWindowOperator;
@@ -40,7 +40,7 @@ import org.apache.heron.streamlet.impl.operators.ReduceByKeyAndWindowOperator;
  * KeyWindowInfo&lt;K&gt; type and the value is of type T.
  */
 public class ReduceByKeyAndWindowStreamlet<R, K, T>
-    extends StreamletImpl<KeyValue<KeyedWindow<K>, T>> {
+    extends KVStreamletImpl<KeyedWindow<K>, T> {
   private StreamletImpl<R> parent;
   private SerializableFunction<R, K> keyExtractor;
   private SerializableFunction<R, T> valueExtractor;
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/KVStreamlet.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/KVStreamlet.scala
new file mode 100644
index 0000000..5b37033
--- /dev/null
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/KVStreamlet.scala
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.streamlet.scala
+
+import org.apache.heron.streamlet.KeyValue
+
+/**
+ * A KVStreamlet is a Streamlet with KeyValue data.
+ */
+trait KVStreamlet[K, V] extends Streamlet[KeyValue[K, V]] {
+}
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
index 9945852..46a6c37 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
@@ -265,6 +265,20 @@ trait Streamlet[R] {
   def split(splitFns: Map[String, R => Boolean]): Streamlet[R]
 
   /**
+   * Return a new KVStreamlet<K, R> by applying key extractor to each element of this Streamlet
+   * @param keyExtractor The function applied to a tuple of this streamlet to get the key
+   */
+  def keyBy[K](keyExtractor: R => K): KVStreamlet[K, R]
+
+  /**
+   * Return a new KVStreamlet<K, V> by applying key and value extractor to each element of this
+   * Streamlet
+   * @param keyExtractor The function applied to a tuple of this streamlet to get the key
+   * @param valueExtractor The function applied to a tuple of this streamlet to extract the value
+   */
+  def keyBy[K, T](keyExtractor: R => K, valueExtractor: R => T): KVStreamlet[K, T]
+
+  /**
     * Logs every element of the streamlet using String.valueOf function
     * This is one of the sink functions in the sense that this operation returns void
     */
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/KVStreamletImpl.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/KVStreamletImpl.scala
new file mode 100644
index 0000000..111ba59
--- /dev/null
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/KVStreamletImpl.scala
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.streamlet.scala.impl
+
+import org.apache.heron.streamlet.{
+  KVStreamlet => JavaKVStreamlet,
+  Streamlet => JavaStreamlet
+}
+import org.apache.heron.streamlet.scala.KVStreamlet
+
+object KVStreamletImpl {
+  def fromJavaKVStreamlet[K, V](javaKVStreamlet: JavaKVStreamlet[K, V]): KVStreamlet[K, V] =
+    new KVStreamletImpl[K, V](javaKVStreamlet)
+
+  def toJavaKVStreamlet[K, V](streamlet: KVStreamlet[K, V]): JavaKVStreamlet[K, V] =
+    streamlet.asInstanceOf[KVStreamletImpl[K, V]].javaKVStreamlet
+}
+
+/**
+ * This class provides Scala Streamlet Implementation by wrapping Java Streamlet API.
+ * Passed User defined Scala Functions are transformed to related FunctionalInterface versions and
+ * related Java Streamlet is transformed to Scala version again.
+ */
+class KVStreamletImpl[K, V](val javaKVStreamlet: JavaKVStreamlet[K, V])
+    extends StreamletImpl(javaKVStreamlet) with KVStreamlet[K, V] {
+}
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
index fc308b5..84b04e2 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
@@ -27,18 +27,22 @@ import org.apache.heron.api.grouping.StreamGrouping
 import org.apache.heron.streamlet.{
   IStreamletOperator,
   JoinType,
-  KeyValue,
   KeyedWindow,
   SerializablePredicate,
+  KVStreamlet => JavaKVStreamlet,
   Streamlet => JavaStreamlet,
   WindowConfig
 }
-import org.apache.heron.streamlet.impl.{StreamletImpl => JavaStreamletImpl}
+import org.apache.heron.streamlet.impl.{
+  KVStreamletImpl => JavaKVStreamletImpl,
+  StreamletImpl => JavaStreamletImpl
+}
 import org.apache.heron.streamlet.impl.streamlets.SupplierStreamlet
 
 import org.apache.heron.streamlet.scala.{
   SerializableTransformer,
   Sink,
+  KVStreamlet,
   Streamlet
 }
 import org.apache.heron.streamlet.scala.converter.ScalaToJavaConverter._
@@ -204,18 +208,18 @@ class StreamletImpl[R](val javaStreamlet: JavaStreamlet[R])
       thisKeyExtractor: R => K,
       otherKeyExtractor: S => K,
       windowCfg: WindowConfig,
-      joinFunction: (R, S) => T): Streamlet[KeyValue[KeyedWindow[K], T]] = {
+      joinFunction: (R, S) => T): KVStreamlet[KeyedWindow[K], T] = {
     val javaOtherStreamlet = toJavaStreamlet[S](other)
     val javaThisKeyExtractor = toSerializableFunction[R, K](thisKeyExtractor)
     val javaOtherKeyExtractor = toSerializableFunction[S, K](otherKeyExtractor)
     val javaJoinFunction = toSerializableBiFunction[R, S, T](joinFunction)
 
-    val newJavaStreamlet = javaStreamlet.join[K, S, T](javaOtherStreamlet,
-                                                       javaThisKeyExtractor,
-                                                       javaOtherKeyExtractor,
-                                                       windowCfg,
-                                                       javaJoinFunction)
-    fromJavaStreamlet[KeyValue[KeyedWindow[K], T]](newJavaStreamlet)
+    val newJavaKVStreamlet = javaStreamlet.join[K, S, T](javaOtherStreamlet,
+                                                         javaThisKeyExtractor,
+                                                         javaOtherKeyExtractor,
+                                                         windowCfg,
+                                                         javaJoinFunction)
+    KVStreamletImpl.fromJavaKVStreamlet[KeyedWindow[K], T](newJavaKVStreamlet)
   }
 
   /**
@@ -240,19 +244,19 @@ class StreamletImpl[R](val javaStreamlet: JavaStreamlet[R])
       otherKeyExtractor: S => K,
       windowCfg: WindowConfig,
       joinType: JoinType,
-      joinFunction: (R, S) => T): Streamlet[KeyValue[KeyedWindow[K], T]] = {
+      joinFunction: (R, S) => T): KVStreamlet[KeyedWindow[K], T] = {
     val javaOtherStreamlet = toJavaStreamlet[S](other)
     val javaThisKeyExtractor = toSerializableFunction[R, K](thisKeyExtractor)
     val javaOtherKeyExtractor = toSerializableFunction[S, K](otherKeyExtractor)
     val javaJoinFunction = toSerializableBiFunction[R, S, T](joinFunction)
 
-    val newJavaStreamlet = javaStreamlet.join[K, S, T](javaOtherStreamlet,
-                                                       javaThisKeyExtractor,
-                                                       javaOtherKeyExtractor,
-                                                       windowCfg,
-                                                       joinType,
-                                                       javaJoinFunction)
-    fromJavaStreamlet[KeyValue[KeyedWindow[K], T]](newJavaStreamlet)
+    val newJavaKVStreamlet = javaStreamlet.join[K, S, T](javaOtherStreamlet,
+                                                         javaThisKeyExtractor,
+                                                         javaOtherKeyExtractor,
+                                                         windowCfg,
+                                                         joinType,
+                                                         javaJoinFunction)
+    KVStreamletImpl.fromJavaKVStreamlet[KeyedWindow[K], T](newJavaKVStreamlet)
   }
 
   /**
@@ -266,21 +270,21 @@ class StreamletImpl[R](val javaStreamlet: JavaStreamlet[R])
     *                       Typical windowing strategies are sliding windows and tumbling windows
     * @param reduceFn       The reduce function that you want to apply to all the values of a key.
     */
-  override def reduceByKeyAndWindow[K, V](
+  override def reduceByKeyAndWindow[K, T](
       keyExtractor: R => K,
-      valueExtractor: R => V,
+      valueExtractor: R => T,
       windowCfg: WindowConfig,
-      reduceFn: (V, V) => V): Streamlet[KeyValue[KeyedWindow[K], V]] = {
+      reduceFn: (T, T) => T): KVStreamlet[KeyedWindow[K], T] = {
     val javaKeyExtractor = toSerializableFunction[R, K](keyExtractor)
-    val javaValueExtractor = toSerializableFunction[R, V](valueExtractor)
-    val javaReduceFunction = toSerializableBinaryOperator[V](reduceFn)
-
-    val newJavaStreamlet = javaStreamlet.reduceByKeyAndWindow[K, V](
-      javaKeyExtractor,
-      javaValueExtractor,
-      windowCfg,
-      javaReduceFunction)
-    fromJavaStreamlet[KeyValue[KeyedWindow[K], V]](newJavaStreamlet)
+    val javaValueExtractor = toSerializableFunction[R, T](valueExtractor)
+    val javaReduceFunction = toSerializableBinaryOperator[T](reduceFn)
+
+    val newJavaKVStreamlet = javaStreamlet.reduceByKeyAndWindow[K, T](
+        javaKeyExtractor,
+        javaValueExtractor,
+        windowCfg,
+        javaReduceFunction)
+    KVStreamletImpl.fromJavaKVStreamlet[KeyedWindow[K], T](newJavaKVStreamlet)
   }
 
   /**
@@ -301,16 +305,16 @@ class StreamletImpl[R](val javaStreamlet: JavaStreamlet[R])
       keyExtractor: R => K,
       windowCfg: WindowConfig,
       identity: T,
-      reduceFn: (T, R) => T): Streamlet[KeyValue[KeyedWindow[K], T]] = {
+      reduceFn: (T, R) => T): KVStreamlet[KeyedWindow[K], T] = {
     val javaKeyExtractor = toSerializableFunction[R, K](keyExtractor)
     val javaReduceFunction = toSerializableBiFunction[T, R, T](reduceFn)
 
-    val newJavaStreamlet = javaStreamlet.reduceByKeyAndWindow[K, T](
-      javaKeyExtractor,
-      windowCfg,
-      identity,
-      javaReduceFunction)
-    fromJavaStreamlet[KeyValue[KeyedWindow[K], T]](newJavaStreamlet)
+    val newJavaKVStreamlet = javaStreamlet.reduceByKeyAndWindow[K, T](
+        javaKeyExtractor,
+        windowCfg,
+        identity,
+        javaReduceFunction)
+    KVStreamletImpl.fromJavaKVStreamlet[KeyedWindow[K], T](newJavaKVStreamlet)
   }
 
   /**
@@ -379,6 +383,32 @@ class StreamletImpl[R](val javaStreamlet: JavaStreamlet[R])
     fromJavaStreamlet[R](newJavaStreamlet)
   }
 
+
+  /**
+   * Return a new KVStreamlet<K, R> by applying key extractor to each element of this Streamlet
+   * @param keyExtractor The function applied to a tuple of this streamlet to get the key
+   */
+  override def keyBy[K](keyExtractor: R => K): KVStreamlet[K, R] = {
+    val javaKeyExtractor = toSerializableFunction[R, K](keyExtractor)
+
+    val newJavaKVStreamlet = javaStreamlet.keyBy[K](javaKeyExtractor)
+    KVStreamletImpl.fromJavaKVStreamlet[K, R](newJavaKVStreamlet)
+  }
+
+  /**
+   * Return a new KVStreamlet<K, V> by applying key and value extractor to each element of this
+   * Streamlet
+   * @param keyExtractor The function applied to a tuple of this streamlet to get the key
+   * @param valueExtractor The function applied to a tuple of this streamlet to extract the value
+   */
+  override def keyBy[K, T](keyExtractor: R => K, valueExtractor: R => T): KVStreamlet[K, T] = {
+    val javaKeyExtractor = toSerializableFunction[R, K](keyExtractor)
+    val javaValueExtractor = toSerializableFunction[R, T](valueExtractor)
+
+    val newJavaKVStreamlet = javaStreamlet.keyBy[K, T](javaKeyExtractor, javaValueExtractor)
+    KVStreamletImpl.fromJavaKVStreamlet[K, T](newJavaKVStreamlet)
+  }
+
   /**
     * Logs every element of the streamlet using String.valueOf function
     * This is one of the sink functions in the sense that this operation returns void
diff --git a/heron/api/tests/java/BUILD b/heron/api/tests/java/BUILD
index 03c1f2b..d8efbdf 100644
--- a/heron/api/tests/java/BUILD
+++ b/heron/api/tests/java/BUILD
@@ -29,6 +29,7 @@ java_tests(
     "org.apache.heron.api.bolt.BaseWindowedBoltTest",
     "org.apache.heron.streamlet.impl.StreamletImplTest",
     "org.apache.heron.streamlet.impl.operators.JoinOperatorTest",
+    "org.apache.heron.streamlet.impl.operators.KeyByOperatorTest",
     "org.apache.heron.streamlet.impl.operators.ReduceByKeyAndWindowOperatorTest",
     "org.apache.heron.streamlet.impl.operators.GeneralReduceByKeyAndWindowOperatorTest",
     "org.apache.heron.streamlet.impl.utils.StreamletUtilsTest",
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
index 5d9053a..d249664 100644
--- a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
+++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
@@ -44,6 +44,7 @@ import org.apache.heron.streamlet.Context;
 import org.apache.heron.streamlet.IStreamletBasicOperator;
 import org.apache.heron.streamlet.IStreamletRichOperator;
 import org.apache.heron.streamlet.IStreamletWindowOperator;
+import org.apache.heron.streamlet.KVStreamlet;
 import org.apache.heron.streamlet.SerializableConsumer;
 import org.apache.heron.streamlet.SerializablePredicate;
 import org.apache.heron.streamlet.SerializableTransformer;
@@ -55,6 +56,7 @@ import org.apache.heron.streamlet.impl.streamlets.CustomStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.FilterStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.FlatMapStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.JoinStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.KeyByStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.MapStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.ReduceByKeyAndWindowStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.SourceStreamlet;
@@ -353,6 +355,20 @@ public class StreamletImplTest {
   }
 
   @Test
+  public void testKeyByStreamlet() {
+    Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
+    KVStreamlet<Long, Double> kvStream = baseStreamlet.keyBy(x -> Math.round(x));
+
+    assertTrue(kvStream instanceof KeyByStreamlet);
+    KeyByStreamlet<Double, Long, Double> mStreamlet =
+        (KeyByStreamlet<Double, Long, Double>) kvStream;
+    assertEquals(1, mStreamlet.getNumPartitions());
+    SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
+    assertEquals(supplierStreamlet.getChildren().size(), 1);
+    assertEquals(supplierStreamlet.getChildren().get(0), kvStream);
+  }
+
+  @Test
   @SuppressWarnings("unchecked")
   public void testSimpleBuild() throws Exception {
     Streamlet<String> baseStreamlet = builder.newSource(() -> "sa re ga ma");
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/operators/KeyByOperatorTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/operators/KeyByOperatorTest.java
new file mode 100644
index 0000000..4f88d05
--- /dev/null
+++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/operators/KeyByOperatorTest.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.streamlet.impl.operators;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.powermock.api.mockito.PowerMockito;
+
+import org.apache.heron.api.Config;
+import org.apache.heron.api.bolt.IOutputCollector;
+import org.apache.heron.api.bolt.OutputCollector;
+import org.apache.heron.api.generated.TopologyAPI;
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.api.tuple.Tuple;
+import org.apache.heron.api.tuple.Values;
+import org.apache.heron.common.utils.topology.TopologyContextImpl;
+import org.apache.heron.common.utils.tuple.TupleImpl;
+import org.apache.heron.streamlet.KeyValue;
+
+public class KeyByOperatorTest {
+
+  private List<Object> emittedTuples;
+
+  @Before
+  public void setUp() {
+    emittedTuples = new LinkedList<>();
+  }
+
+  @Test
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  public void testKeyByOperator() {
+    KeyByOperator<String, String, Integer> keyByOperator = getKeyByOperator();
+
+    HashMap<String, Integer> expectedResults = new HashMap<>();
+    expectedResults.put("even", 0);
+    expectedResults.put("odd", 1);
+    expectedResults.put("even", 2);
+
+    TopologyAPI.StreamId componentStreamId
+        = TopologyAPI.StreamId.newBuilder()
+        .setComponentName("sourceComponent").setId("default").build();
+
+    keyByOperator.execute(getTuple(componentStreamId, new Fields("a"), new Values("0")));
+    keyByOperator.execute(getTuple(componentStreamId, new Fields("a"), new Values("1")));
+    keyByOperator.execute(getTuple(componentStreamId, new Fields("a"), new Values("2")));
+
+    Assert.assertEquals(3, emittedTuples.size());
+    String[] s = {"even", "odd"};
+    for (Object object : emittedTuples) {
+      KeyValue<String, Integer> tuple = (KeyValue<String, Integer>) object;
+      Assert.assertEquals(s[tuple.getValue() % 2], tuple.getKey());
+    }
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  private KeyByOperator<String, String, Integer> getKeyByOperator() {
+    KeyByOperator<String, String, Integer> keyByOperator =
+        new KeyByOperator<String, String, Integer>(
+            x -> (Integer.valueOf(x) % 2 == 0) ? "even" : "odd",
+            x -> Integer.valueOf(x));
+
+    keyByOperator.prepare(new Config(), PowerMockito.mock(TopologyContext.class),
+        new OutputCollector(new IOutputCollector() {
+
+          @Override
+          public void reportError(Throwable error) {
+          }
+
+          @Override
+          public List<Integer> emit(String streamId,
+                                    Collection<Tuple> anchors, List<Object> tuple) {
+            emittedTuples.addAll(tuple);
+            return null;
+          }
+
+          @Override
+          public void emitDirect(int taskId, String streamId,
+                                 Collection<Tuple> anchors, List<Object> tuple) {
+          }
+
+          @Override
+          public void ack(Tuple input) {
+          }
+
+          @Override
+          public void fail(Tuple input) {
+          }
+        }));
+    return keyByOperator;
+  }
+
+  private Tuple getTuple(TopologyAPI.StreamId streamId, final Fields fields, Values values) {
+
+    TopologyContext topologyContext = getContext(fields);
+    return new TupleImpl(topologyContext, streamId, 0,
+        null, values, 1) {
+      @Override
+      public TopologyAPI.StreamId getSourceGlobalStreamId() {
+        return TopologyAPI.StreamId.newBuilder().setComponentName("sourceComponent")
+            .setId("default").build();
+      }
+    };
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  private TopologyContext getContext(final Fields fields) {
+    TopologyBuilder builder = new TopologyBuilder();
+    return new TopologyContextImpl(new Config(),
+        builder.createTopology()
+            .setConfig(new Config())
+            .setName("test")
+            .setState(TopologyAPI.TopologyState.RUNNING)
+            .getTopology(),
+        new HashMap(), 1, null) {
+      @Override
+      public Fields getComponentOutputFields(
+          String componentId, String streamId) {
+        return fields;
+      }
+
+    };
+  }
+}
diff --git a/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala b/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala
index 5664d64..76de00c 100644
--- a/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala
+++ b/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala
@@ -40,6 +40,7 @@ import org.apache.heron.streamlet.impl.streamlets.{
   FlatMapStreamlet,
   LogStreamlet,
   JoinStreamlet,
+  KeyByStreamlet,
   MapStreamlet,
   ReduceByKeyAndWindowStreamlet,
   RemapStreamlet,
@@ -638,6 +639,33 @@ class StreamletImplTest extends BaseFunSuite {
     assertEquals(0, mapStreamlet.getChildren.size())
   }
 
+  test("StreamletImpl should support keyBy operation") {
+    val supplierStreamlet = builder
+      .newSource(() => Random.nextInt(10))
+      .setName("Supplier_Streamlet_1")
+      .setNumPartitions(3)
+
+    supplierStreamlet
+      .keyBy[Int, Int]((key: Int) => key % 3,  // Put into 3 groups
+                       (value: Int) => value)
+      .setName("KeyBy_Streamlet_1")
+      .setNumPartitions(5)
+
+    val supplierStreamletImpl =
+      supplierStreamlet.asInstanceOf[StreamletImpl[Int]]
+    assertEquals(1, supplierStreamletImpl.getChildren.size)
+    assertTrue(
+      supplierStreamletImpl
+        .getChildren(0)
+        .isInstanceOf[KeyByStreamlet[_, _, _]])
+    val keyByStreamlet = supplierStreamletImpl
+      .getChildren(0)
+      .asInstanceOf[KeyByStreamlet[Int, Int, Int]]
+    assertEquals("KeyBy_Streamlet_1", keyByStreamlet.getName)
+    assertEquals(5, keyByStreamlet.getNumPartitions)
+    assertEquals(0, keyByStreamlet.getChildren.size())
+  }
+
   private def verifyClonedStreamlets[R](supplierStreamlet: Streamlet[R],
                                         numClones: Int): Unit = {
     val supplierStreamletImpl =