You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by GitBox <gi...@apache.org> on 2018/12/21 22:02:49 UTC

[GitHub] nwangtw closed pull request #3131: Nwang/add count and reduce operations in streamlet

nwangtw closed pull request #3131: Nwang/add count and reduce operations in streamlet
URL: https://github.com/apache/incubator-heron/pull/3131
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
index 5607e8a229..1a108a4475 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
@@ -171,6 +171,27 @@
              SerializableFunction<S, K> otherKeyExtractor, WindowConfig windowCfg,
              JoinType joinType, SerializableBiFunction<R, S, ? extends T> joinFunction);
 
+  /**
+   * Return a new Streamlet accumulating tuples of this streamlet and applying reduceFn on those tuples.
+   * @param keyExtractor The function applied to a tuple of this streamlet to get the key
+   * @param valueExtractor The function applied to a tuple of this streamlet to extract the value
+   * to be reduced on
+   * @param reduceFn The reduce function that you want to apply to all the values of a key.
+   */
+  <K, T> KVStreamlet<K, T> reduceByKey(SerializableFunction<R, K> keyExtractor,
+                                       SerializableFunction<R, T> valueExtractor,
+                                       SerializableBinaryOperator<T> reduceFn);
+
+  /**
+   * Return a new Streamlet accumulating tuples of this streamlet and applying reduceFn on those tuples.
+   * @param keyExtractor The function applied to a tuple of this streamlet to get the key
+   * @param identity The identity element is the initial value for each key
+   * @param reduceFn The reduce function that you want to apply to all the values of a key.
+   */
+  <K, T> KVStreamlet<K, T> reduceByKey(SerializableFunction<R, K> keyExtractor,
+                                       T identity,
+                                       SerializableBiFunction<T, R, ? extends T> reduceFn);
+
   /**
    * Return a new Streamlet accumulating tuples of this streamlet over a Window defined by
    * windowCfg and applying reduceFn on those tuples.
@@ -258,6 +279,22 @@
   <K, V> KVStreamlet<K, V> keyBy(SerializableFunction<R, K> keyExtractor,
                                  SerializableFunction<R, V> valueExtractor);
 
+  /**
+   * Returns a new stream of <key, count> by counting tuples in this stream on each key.
+   * @param keyExtractor The function applied to a tuple of this streamlet to get the key
+   */
+  <K> KVStreamlet<K, Long> countByKey(SerializableFunction<R, K> keyExtractor);
+
+  /**
+   * Returns a new stream of <key, count> by counting tuples over a window in this stream on each key.
+   * @param keyExtractor The function applied to a tuple of this streamlet to get the key
+   * @param windowCfg This is a specification of what kind of windowing strategy you like to have.
+   * Typical windowing strategies are sliding windows and tumbling windows
+   * Note that there could be 0 or multiple target stream ids
+   */
+  <K> KVStreamlet<KeyedWindow<K>, Long> countByKeyAndWindow(
+      SerializableFunction<R, K> keyExtractor, WindowConfig windowCfg);
+
   /**
    * Logs every element of the streamlet using String.valueOf function
    * This is one of the sink functions in the sense that this operation returns void
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
index 9ce51b4ad8..c05c654567 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
@@ -45,15 +45,19 @@
 import org.apache.heron.streamlet.Streamlet;
 import org.apache.heron.streamlet.WindowConfig;
 import org.apache.heron.streamlet.impl.streamlets.ConsumerStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.CountByKeyAndWindowStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.CountByKeyStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.CustomStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.FilterStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.FlatMapStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.GeneralReduceByKeyAndWindowStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.GeneralReduceByKeyStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.JoinStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.KeyByStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.LogStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.MapStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.ReduceByKeyAndWindowStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.ReduceByKeyStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.RemapStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.SinkStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.SplitStreamlet;
@@ -109,6 +113,7 @@ public boolean allBuilt() {
 
   protected enum StreamletNamePrefix {
     CONSUMER("consumer"),
+    COUNT("count"),
     CUSTOM("custom"),
     CUSTOM_BASIC("customBasic"),
     CUSTOM_WINDOW("customWindow"),
@@ -119,7 +124,7 @@ public boolean allBuilt() {
     LOGGER("logger"),
     MAP("map"),
     SOURCE("generator"),
-    REDUCE("reduceByKeyAndWindow"),
+    REDUCE("reduce"),
     REMAP("remap"),
     SINK("sink"),
     SPLIT("split"),
@@ -445,6 +450,47 @@ private String defaultNameCalculator(StreamletNamePrefix prefix, Set<String> sta
     return retval;
   }
 
+  /**
+   * Return a new Streamlet accumulating tuples of this streamlet and applying reduceFn on those tuples.
+   * @param keyExtractor The function applied to a tuple of this streamlet to get the key
+   * @param valueExtractor The function applied to a tuple of this streamlet to extract the value
+   * to be reduced on
+   * @param reduceFn The reduce function that you want to apply to all the values of a key.
+   */
+  @Override
+  public <K, T> KVStreamlet<K, T> reduceByKey(SerializableFunction<R, K> keyExtractor,
+                                              SerializableFunction<R, T> valueExtractor,
+                                              SerializableBinaryOperator<T> reduceFn) {
+    checkNotNull(keyExtractor, "keyExtractor cannot be null");
+    checkNotNull(valueExtractor, "valueExtractor cannot be null");
+    checkNotNull(reduceFn, "reduceFn cannot be null");
+
+    ReduceByKeyStreamlet<R, K, T> retval =
+        new ReduceByKeyStreamlet<>(this, keyExtractor, valueExtractor, reduceFn);
+    addChild(retval);
+    return retval;
+  }
+
+  /**
+   * Return a new Streamlet accumulating tuples of this streamlet and applying reduceFn on those tuples.
+   * @param keyExtractor The function applied to a tuple of this streamlet to get the key
+   * @param identity The identity element is the initial value for each key
+   * @param reduceFn The reduce function that you want to apply to all the values of a key.
+   */
+  @Override
+  public <K, T> KVStreamlet<K, T> reduceByKey(SerializableFunction<R, K> keyExtractor,
+                                              T identity,
+                                              SerializableBiFunction<T, R, ? extends T> reduceFn) {
+    checkNotNull(keyExtractor, "keyExtractor cannot be null");
+    checkNotNull(identity, "identity cannot be null");
+    checkNotNull(reduceFn, "reduceFn cannot be null");
+
+    GeneralReduceByKeyStreamlet<R, K, T> retval =
+        new GeneralReduceByKeyStreamlet<>(this, keyExtractor, identity, reduceFn);
+    addChild(retval);
+    return retval;
+  }
+
   /**
    * Return a new Streamlet accumulating tuples of this streamlet over a Window defined by
    * windowCfg and applying reduceFn on those tuples.
@@ -644,4 +690,38 @@ public void toSink(Sink<R> sink) {
     addChild(retval);
     return retval;
   }
+
+  /**
+   * Returns a new stream of <key, count> by counting tuples in this stream on each key.
+   * @param keyExtractor The function applied to a tuple of this streamlet to get the key
+   */
+  @Override
+  public <K> KVStreamlet<K, Long>
+      countByKey(SerializableFunction<R, K> keyExtractor) {
+    checkNotNull(keyExtractor, "keyExtractor cannot be null");
+
+    CountByKeyStreamlet<R, K> retval = new CountByKeyStreamlet<>(this, keyExtractor);
+    addChild(retval);
+    return retval;
+  }
+
+
+  /**
+   * Returns a new stream of <key, count> by counting tuples over a window in this stream on each key.
+   * @param keyExtractor The function applied to a tuple of this streamlet to get the key
+   * @param windowCfg This is a specification of what kind of windowing strategy you like to have.
+   * Typical windowing strategies are sliding windows and tumbling windows
+   * Note that there could be 0 or multiple target stream ids
+   */
+  @Override
+  public <K> KVStreamlet<KeyedWindow<K>, Long> countByKeyAndWindow(
+      SerializableFunction<R, K> keyExtractor, WindowConfig windowCfg) {
+    checkNotNull(keyExtractor, "keyExtractor cannot be null");
+    checkNotNull(windowCfg, "windowCfg cannot be null");
+
+    CountByKeyAndWindowStreamlet<R, K> retval =
+        new CountByKeyAndWindowStreamlet<>(this, keyExtractor, windowCfg);
+    addChild(retval);
+    return retval;
+  }
 }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyOperator.java
new file mode 100644
index 0000000000..462b0a2e9f
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyOperator.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.streamlet.impl.operators;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.heron.api.tuple.Tuple;
+import org.apache.heron.api.tuple.Values;
+import org.apache.heron.streamlet.KeyValue;
+import org.apache.heron.streamlet.SerializableBiFunction;
+import org.apache.heron.streamlet.SerializableFunction;
+
+/**
+ * ReduceByKeyOperator is the class that implements the reduce functionality.
+ * It takes in the key and value extractors, an initial value, and a reduceFunction
+ * function as the input. The result are key value pairs of <K, T>.
+ * R: Input data type, K, Key type, T, Value type.
+ * TODO: make it stateful or create a new stateful operator. The tricky part is how
+ * to convert K and T to State<> which needs to be serializable.
+ */
+public class GeneralReduceByKeyOperator<R, K, T> extends StreamletOperator<R, KeyValue<K, T>> {
+  private SerializableFunction<R, K> keyExtractor;
+  private T identity;
+  private SerializableBiFunction<T, R, ? extends T> reduceFn;
+
+  private Map<K, T> reduceMap;
+
+  public GeneralReduceByKeyOperator(SerializableFunction<R, K> keyExtractor,
+                                    T identity,
+                                    SerializableBiFunction<T, R, ? extends T> reduceFn) {
+    this.keyExtractor = keyExtractor;
+    this.identity = identity;
+    this.reduceFn = reduceFn;
+    reduceMap = new HashMap<K, T>();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void execute(Tuple tuple) {
+    R obj = (R) tuple.getValue(0);
+    K key = keyExtractor.apply(obj);
+
+    T oldValue = reduceMap.getOrDefault(key, identity);
+    T newValue = reduceFn.apply(oldValue, obj);
+
+    reduceMap.put(key, newValue);
+    collector.emit(new Values(new KeyValue<K, T>(key, newValue)));
+    collector.ack(tuple);
+  }
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyOperator.java
new file mode 100644
index 0000000000..5830a7c75f
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyOperator.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.streamlet.impl.operators;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.heron.api.tuple.Tuple;
+import org.apache.heron.api.tuple.Values;
+import org.apache.heron.streamlet.KeyValue;
+import org.apache.heron.streamlet.SerializableBinaryOperator;
+import org.apache.heron.streamlet.SerializableFunction;
+
+/**
+ * ReduceByKeyOperator is the class that implements the reduce functionality.
+ * It takes in the key and value extractors, an initial value, and a reduceFunction
+ * function as the input. The result are key value pairs of <K, T>.
+ * R: Input data type, K, Key type, T, Value type.
+ * TODO: make it stateful or create a new stateful operator. The tricky part is how
+ * to convert K and T to State<> which needs to be serializable.
+ */
+public class ReduceByKeyOperator<R, K, T> extends StreamletOperator<R, KeyValue<K, T>> {
+  private SerializableFunction<R, K> keyExtractor;
+  private SerializableFunction<R, T> valueExtractor;
+  private SerializableBinaryOperator<T> reduceFn;
+
+  private Map<K, T> reduceMap;
+
+  public ReduceByKeyOperator(SerializableFunction<R, K> keyExtractor,
+                             SerializableFunction<R, T> valueExtractor,
+                             SerializableBinaryOperator<T> reduceFn) {
+    this.keyExtractor = keyExtractor;
+    this.valueExtractor = valueExtractor;
+    this.reduceFn = reduceFn;
+    reduceMap = new HashMap<K, T>();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void execute(Tuple tuple) {
+    R obj = (R) tuple.getValue(0);
+    K key = keyExtractor.apply(obj);
+    T value = valueExtractor.apply(obj);
+
+    T newValue;
+    if (reduceMap.containsKey(key)) {
+      newValue = reduceFn.apply(reduceMap.get(key), value);
+    } else {
+      newValue = value;
+    }
+
+    reduceMap.put(key, newValue);
+    collector.emit(new Values(new KeyValue<K, T>(key, newValue)));
+    collector.ack(tuple);
+  }
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyAndWindowStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyAndWindowStreamlet.java
new file mode 100644
index 0000000000..aac859f29f
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyAndWindowStreamlet.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.heron.streamlet.impl.streamlets;
+
+import java.util.Set;
+
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.KeyedWindow;
+import org.apache.heron.streamlet.SerializableFunction;
+import org.apache.heron.streamlet.WindowConfig;
+import org.apache.heron.streamlet.impl.KVStreamletImpl;
+import org.apache.heron.streamlet.impl.StreamletImpl;
+import org.apache.heron.streamlet.impl.groupings.ReduceByKeyAndWindowCustomGrouping;
+import org.apache.heron.streamlet.impl.operators.ReduceByKeyAndWindowOperator;
+
+/**
+ * CountByKeyAndWindowStreamlet represents a KVStreamlet that is the result of
+ * counting all elements within each window defined by a user supplied Window Config.
+ * ReduceByKeyAndWindowStreamlet's elements are of KeyValue type where the key is
+ * KeyWindowInfo&lt;K&gt; type and the value is Long.
+ */
+public class CountByKeyAndWindowStreamlet<R, K>
+    extends KVStreamletImpl<KeyedWindow<K>, Long> {
+  private StreamletImpl<R> parent;
+  private SerializableFunction<R, K> keyExtractor;
+  private WindowConfig windowCfg;
+
+  public CountByKeyAndWindowStreamlet(StreamletImpl<R> parent,
+                                      SerializableFunction<R, K> keyExtractor,
+                                      WindowConfig windowCfg) {
+    this.parent = parent;
+    this.keyExtractor = keyExtractor;
+    this.windowCfg = windowCfg;
+    setNumPartitions(parent.getNumPartitions());
+  }
+
+  @Override
+  public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
+    setDefaultNameIfNone(StreamletNamePrefix.COUNT, stageNames);
+    // Count is a special case of reduce operation. Hence ReduceByKeyAndWindowOperator
+    // is used here. Every tuple has a value of 1 and the reduce operation is a simple sum.
+    ReduceByKeyAndWindowOperator<R, K, Long> bolt =
+        new ReduceByKeyAndWindowOperator<R, K, Long>(keyExtractor, x -> 1L, (c1, c2) -> c1 + c2);
+    windowCfg.applyTo(bolt);
+    bldr.setBolt(getName(), bolt, getNumPartitions())
+        .customGrouping(parent.getName(), parent.getStreamId(),
+            // TODO: rename ReduceByKeyAndWindowCustomGrouping
+            new ReduceByKeyAndWindowCustomGrouping<R, K>(keyExtractor));
+    return true;
+  }
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyStreamlet.java
new file mode 100644
index 0000000000..965d897eea
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyStreamlet.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.heron.streamlet.impl.streamlets;
+
+import java.util.Set;
+
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.SerializableFunction;
+import org.apache.heron.streamlet.impl.KVStreamletImpl;
+import org.apache.heron.streamlet.impl.StreamletImpl;
+import org.apache.heron.streamlet.impl.groupings.ReduceByKeyAndWindowCustomGrouping;
+import org.apache.heron.streamlet.impl.operators.ReduceByKeyOperator;
+
+/**
+ * ReduceByKeyAndWindowStreamlet represents a KVStreamlet that is the result of
+ * applying user supplied reduceFn on all elements within each window defined by a
+ * user supplied Window Config.
+ * Note that this is a stateful operation. And K and T types need to be serializable.
+ * ReduceByKeyAndWindowStreamlet's elements are of KeyValue type where the key is
+ * KeyWindowInfo&lt;K&gt; type and the value is of type V.
+ */
+public class CountByKeyStreamlet<R, K> extends KVStreamletImpl<K, Long> {
+  private StreamletImpl<R> parent;
+  private SerializableFunction<R, K> keyExtractor;
+
+  public CountByKeyStreamlet(StreamletImpl<R> parent, SerializableFunction<R, K> keyExtractor) {
+    this.parent = parent;
+    this.keyExtractor = keyExtractor;
+    setNumPartitions(parent.getNumPartitions());
+  }
+
+  @Override
+  public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
+    setDefaultNameIfNone(StreamletNamePrefix.COUNT, stageNames);
+    // Count is a special case of reduce operation. Hence ReduceByKeyAndWindowOperator
+    // is used here. Every tuple has a value of 1 and the reduce operation is a simple sum.
+    ReduceByKeyOperator<R, K, Long> bolt =
+        new ReduceByKeyOperator<R, K, Long>(keyExtractor, x -> 1L, (c1, c2) -> c1 + c2);
+    bldr.setBolt(getName(), bolt, getNumPartitions())
+        .customGrouping(parent.getName(), parent.getStreamId(),
+            new ReduceByKeyAndWindowCustomGrouping<R, K>(keyExtractor));
+    return true;
+  }
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyStreamlet.java
new file mode 100644
index 0000000000..9b4d2fe4fe
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyStreamlet.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.heron.streamlet.impl.streamlets;
+
+import java.util.Set;
+
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.SerializableBiFunction;
+import org.apache.heron.streamlet.SerializableFunction;
+import org.apache.heron.streamlet.impl.KVStreamletImpl;
+import org.apache.heron.streamlet.impl.StreamletImpl;
+import org.apache.heron.streamlet.impl.groupings.ReduceByKeyAndWindowCustomGrouping;
+import org.apache.heron.streamlet.impl.operators.GeneralReduceByKeyOperator;
+
+/**
+ * GeneralReduceByKeyStreamlet represents a KVStreamlet that is the result of
+ * applying user supplied reduceFn on all elements.
+ * GeneralReduceByKeyStreamlet's elements are of KeyValue type where the key is
+ * KeyWindowInfo&lt;K&gt; type and the value is of type T.
+ */
+public class GeneralReduceByKeyStreamlet<R, K, T> extends KVStreamletImpl<K, T> {
+  private StreamletImpl<R> parent;
+  private SerializableFunction<R, K> keyExtractor;
+  private T identity;
+  private SerializableBiFunction<T, R, ? extends T> reduceFn;
+
+  public GeneralReduceByKeyStreamlet(StreamletImpl<R> parent,
+                                     SerializableFunction<R, K> keyExtractor,
+                                     T identity,
+                                     SerializableBiFunction<T, R, ? extends T> reduceFn) {
+    this.parent = parent;
+    this.keyExtractor = keyExtractor;
+    this.identity = identity;
+    this.reduceFn = reduceFn;
+    setNumPartitions(parent.getNumPartitions());
+  }
+
+  @Override
+  public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
+    setDefaultNameIfNone(StreamletNamePrefix.REDUCE, stageNames);
+    GeneralReduceByKeyOperator<R, K, T> bolt =
+        new GeneralReduceByKeyOperator<R, K, T>(keyExtractor, identity, reduceFn);
+    bldr.setBolt(getName(), bolt, getNumPartitions())
+        .customGrouping(parent.getName(), parent.getStreamId(),
+            new ReduceByKeyAndWindowCustomGrouping<R, K>(keyExtractor));
+    return true;
+  }
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyStreamlet.java
new file mode 100644
index 0000000000..ea84687a0e
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyStreamlet.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.heron.streamlet.impl.streamlets;
+
+import java.util.Set;
+
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.SerializableBinaryOperator;
+import org.apache.heron.streamlet.SerializableFunction;
+import org.apache.heron.streamlet.impl.KVStreamletImpl;
+import org.apache.heron.streamlet.impl.StreamletImpl;
+import org.apache.heron.streamlet.impl.groupings.ReduceByKeyAndWindowCustomGrouping;
+import org.apache.heron.streamlet.impl.operators.ReduceByKeyOperator;
+
+/**
+ * ReduceByKeyStreamlet represents a KVStreamlet that is the result of
+ * applying user supplied reduceFn on all elements.
+ * ReduceByKeyAndWindowStreamlet's elements are of KeyValue type where the key is
+ * KeyWindowInfo&lt;K&gt; type and the value is of type T.
+ */
+public class ReduceByKeyStreamlet<R, K, T> extends KVStreamletImpl<K, T> {
+  private StreamletImpl<R> parent;
+  private SerializableFunction<R, K> keyExtractor;
+  private SerializableFunction<R, T> valueExtractor;
+  private SerializableBinaryOperator<T> reduceFn;
+
+  public ReduceByKeyStreamlet(StreamletImpl<R> parent,
+                              SerializableFunction<R, K> keyExtractor,
+                              SerializableFunction<R, T> valueExtractor,
+                              SerializableBinaryOperator<T> reduceFn) {
+    this.parent = parent;
+    this.keyExtractor = keyExtractor;
+    this.valueExtractor = valueExtractor;
+    this.reduceFn = reduceFn;
+    setNumPartitions(parent.getNumPartitions());
+  }
+
+  @Override
+  public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
+    setDefaultNameIfNone(StreamletNamePrefix.REDUCE, stageNames);
+    ReduceByKeyOperator<R, K, T> bolt =
+        new ReduceByKeyOperator<R, K, T>(keyExtractor, valueExtractor, reduceFn);
+    bldr.setBolt(getName(), bolt, getNumPartitions())
+        .customGrouping(parent.getName(), parent.getStreamId(),
+            new ReduceByKeyAndWindowCustomGrouping<R, K>(keyExtractor));
+    return true;
+  }
+}
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
index 46a6c37f28..8151941c1f 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
@@ -185,6 +185,28 @@ trait Streamlet[R] {
       joinType: JoinType,
       joinFunction: (R, S) => T): Streamlet[KeyValue[KeyedWindow[K], T]]
 
+  /**
+   * Return a new Streamlet accumulating tuples of this streamlet and applying reduceFn on those tuples.
+   * @param keyExtractor The function applied to a tuple of this streamlet to get the key
+   * @param valueExtractor The function applied to a tuple of this streamlet to extract the value
+   * to be reduced on
+   * @param identity The identity element is the initial value for each key
+   * @param reduceFn The reduce function that you want to apply to all the values of a key.
+   */
+  def reduceByKey[K, T](keyExtractor: R => K,
+                        valueExtractor: R => T,
+                        reduceFn: (T, T) => T): Streamlet[KeyValue[K, T]]
+
+  /**
+   * Return a new Streamlet accumulating tuples of this streamlet and applying reduceFn on those tuples.
+   * @param keyExtractor The function applied to a tuple of this streamlet to get the key
+   * @param identity The identity element is the initial value for each key
+   * @param reduceFn The reduce function that you want to apply to all the values of a key.
+   */
+  def reduceByKey[K, T](keyExtractor: R => K,
+                        identity: T,
+                        reduceFn: (T, R) => T): Streamlet[KeyValue[K, T]]
+
   /**
     * Return a new Streamlet accumulating tuples of this streamlet over a Window defined by
     * windowCfg and applying reduceFn on those tuples.
@@ -264,6 +286,22 @@ trait Streamlet[R] {
    */
   def split(splitFns: Map[String, R => Boolean]): Streamlet[R]
 
+/**
+   * Returns a new stream of <key, count> by counting tuples in this stream on each key.
+   * @param keyExtractor The function applied to a tuple of this streamlet to get the key
+   */
+  def countByKey[K](keyExtractor: R => K): Streamlet[KeyValue[K, java.lang.Long]]
+
+  /**
+   * Returns a new stream of <key, count> by counting tuples over a window in this stream on each key.
+   * @param keyExtractor The function applied to a tuple of this streamlet to get the key
+   * @param windowCfg This is a specification of what kind of windowing strategy you like to have.
+   * Typical windowing strategies are sliding windows and tumbling windows
+   * Note that there could be 0 or multiple target stream ids
+   */
+  def countByKeyAndWindow[K](keyExtractor: R => K,
+      windowCfg: WindowConfig): Streamlet[KeyValue[KeyedWindow[K], java.lang.Long]]
+
   /**
    * Return a new KVStreamlet<K, R> by applying key extractor to each element of this Streamlet
    * @param keyExtractor The function applied to a tuple of this streamlet to get the key
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
index 84b04e23fd..ea68949242 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
@@ -18,9 +18,10 @@
  */
 package org.apache.heron.streamlet.scala.impl
 
-import java.util.{Map => JMap}
-import java.util.{HashMap => JHashMap}
-
+import java.util.{
+  HashMap => JHashMap,
+  Map => JMap
+}
 import scala.collection.JavaConverters
 
 import org.apache.heron.api.grouping.StreamGrouping
@@ -259,6 +260,46 @@ class StreamletImpl[R](val javaStreamlet: JavaStreamlet[R])
     KVStreamletImpl.fromJavaKVStreamlet[KeyedWindow[K], T](newJavaKVStreamlet)
   }
 
+  /**
+   * Return a new Streamlet accumulating tuples of this streamlet and applying reduceFn on those tuples.
+   * @param keyExtractor The function applied to a tuple of this streamlet to get the key
+   * @param valueExtractor The function applied to a tuple of this streamlet to extract the value
+   * to be reduced on
+   * @param reduceFn The reduce function that you want to apply to all the values of a key.
+   */
+  override def reduceByKey[K, T](keyExtractor: R => K,
+                                 valueExtractor: R => T,
+                                 reduceFn: (T, T) => T): KVStreamlet[K, T] = {
+    val javaKeyExtractor = toSerializableFunction[R, K](keyExtractor)
+    val javaValueExtractor = toSerializableFunction[R, T](valueExtractor)
+    val javaReduceFunction = toSerializableBinaryOperator[T](reduceFn)
+
+    val newJavaKVStreamlet = javaStreamlet.reduceByKey[K, T](
+      javaKeyExtractor,
+      javaValueExtractor,
+      javaReduceFunction)
+    KVStreamletImpl.fromJavaKVStreamlet[K, T](newJavaKVStreamlet)
+  }
+
+  /**
+   * Return a new Streamlet accumulating tuples of this streamlet and applying reduceFn on those tuples.
+   * @param keyExtractor The function applied to a tuple of this streamlet to get the key
+   * @param identity The identity element is the initial value for each key
+   * @param reduceFn The reduce function that you want to apply to all the values of a key.
+   */
+  override def reduceByKey[K, T](keyExtractor: R => K,
+                           identity: T,
+                           reduceFn: (T, R) => T): KVStreamlet[K, T] = {
+    val javaKeyExtractor = toSerializableFunction[R, K](keyExtractor)
+    val javaReduceFunction = toSerializableBiFunction[T, R, T](reduceFn)
+
+    val newJavaKVStreamlet = javaStreamlet.reduceByKey[K, T](
+      javaKeyExtractor,
+      identity,
+      javaReduceFunction)
+    KVStreamletImpl.fromJavaKVStreamlet[K, T](newJavaKVStreamlet)
+  }
+
   /**
     * Return a new Streamlet accumulating tuples of this streamlet over a Window defined by
     * windowCfg and applying reduceFn on those tuples.
@@ -409,6 +450,33 @@ class StreamletImpl[R](val javaStreamlet: JavaStreamlet[R])
     KVStreamletImpl.fromJavaKVStreamlet[K, T](newJavaKVStreamlet)
   }
 
+  /**
+   * Returns a new stream of <key, count> by counting tuples in this stream on each key.
+   * @param keyExtractor The function applied to a tuple of this streamlet to get the key
+   */
+  override def countByKey[K](keyExtractor: R => K): KVStreamlet[K, java.lang.Long] = {
+    val javaKeyExtractor = toSerializableFunction[R, K](keyExtractor)
+
+    val newJavaKVStreamlet = javaStreamlet.countByKey[K](javaKeyExtractor)
+    KVStreamletImpl.fromJavaKVStreamlet[K, java.lang.Long](newJavaKVStreamlet)
+  }
+
+  /**
+   * Returns a new stream of <key, count> by counting tuples over a window in this stream on each key.
+   * @param keyExtractor The function applied to a tuple of this streamlet to get the key
+   * @param windowCfg This is a specification of what kind of windowing strategy you like to have.
+   * Typical windowing strategies are sliding windows and tumbling windows
+   * Note that there could be 0 or multiple target stream ids
+   */
+  override def countByKeyAndWindow[K](keyExtractor: R => K,
+      windowCfg: WindowConfig): KVStreamlet[KeyedWindow[K], java.lang.Long] = {
+
+    val javaKeyExtractor = toSerializableFunction[R, K](keyExtractor)
+
+    val newJavaKVStreamlet = javaStreamlet.countByKeyAndWindow[K](javaKeyExtractor, windowCfg)
+    KVStreamletImpl.fromJavaKVStreamlet[KeyedWindow[K], java.lang.Long](newJavaKVStreamlet)
+  }
+
   /**
     * Logs every element of the streamlet using String.valueOf function
     * This is one of the sink functions in the sense that this operation returns void
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
index d249664644..fa3aff7c31 100644
--- a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
+++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
@@ -45,6 +45,7 @@
 import org.apache.heron.streamlet.IStreamletRichOperator;
 import org.apache.heron.streamlet.IStreamletWindowOperator;
 import org.apache.heron.streamlet.KVStreamlet;
+import org.apache.heron.streamlet.KeyedWindow;
 import org.apache.heron.streamlet.SerializableConsumer;
 import org.apache.heron.streamlet.SerializablePredicate;
 import org.apache.heron.streamlet.SerializableTransformer;
@@ -52,13 +53,17 @@
 import org.apache.heron.streamlet.Streamlet;
 import org.apache.heron.streamlet.WindowConfig;
 import org.apache.heron.streamlet.impl.streamlets.ConsumerStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.CountByKeyAndWindowStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.CountByKeyStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.CustomStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.FilterStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.FlatMapStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.GeneralReduceByKeyStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.JoinStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.KeyByStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.MapStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.ReduceByKeyAndWindowStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.ReduceByKeyStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.SourceStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.SpoutStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.SupplierStreamlet;
@@ -368,6 +373,75 @@ public void testKeyByStreamlet() {
     assertEquals(supplierStreamlet.getChildren().get(0), kvStream);
   }
 
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testReduceByKeyStreamlet() {
+    Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
+    KVStreamlet<String, Double> streamlet = baseStreamlet.setNumPartitions(20)
+        .<String, Double>reduceByKey(x -> (x > 0) ? "positive" : ((x < 0) ? "negative" : "zero"),
+            x -> x,
+            (x, y) -> x + y);  // A sum operation
+
+    assertTrue(streamlet instanceof ReduceByKeyStreamlet);
+    ReduceByKeyStreamlet<Double, String, Double> mStreamlet =
+        (ReduceByKeyStreamlet<Double, String, Double>) streamlet;
+    assertEquals(20, mStreamlet.getNumPartitions());
+    SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
+    assertEquals(supplierStreamlet.getChildren().size(), 1);
+    assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testGeneralReduceByKeyStreamlet() {
+    Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
+    KVStreamlet<String, Double> streamlet = baseStreamlet.setNumPartitions(20)
+        .reduceByKey(x -> (x > 0) ? "positive" : ((x < 0) ? "negative" : "zero"),
+            0.0,
+            (x, y) -> x + y);  // A sum operation
+
+    assertTrue(streamlet instanceof GeneralReduceByKeyStreamlet);
+    GeneralReduceByKeyStreamlet<Double, String, Double> mStreamlet =
+        (GeneralReduceByKeyStreamlet<Double, String, Double>) streamlet;
+    assertEquals(20, mStreamlet.getNumPartitions());
+    SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
+    assertEquals(supplierStreamlet.getChildren().size(), 1);
+    assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testCountByKeyStreamlet() {
+    Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
+    KVStreamlet<String, Long> streamlet = baseStreamlet.setNumPartitions(20)
+        .countByKey(x -> (x > 0) ? "positive" : ((x < 0) ? "negative" : "zero"));
+
+    assertTrue(streamlet instanceof CountByKeyStreamlet);
+    CountByKeyStreamlet<Double, String> mStreamlet =
+        (CountByKeyStreamlet<Double, String>) streamlet;
+    assertEquals(20, mStreamlet.getNumPartitions());
+    SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
+    assertEquals(supplierStreamlet.getChildren().size(), 1);
+    assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testCountByKeyAndWindowStreamlet() {
+    Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
+    KVStreamlet<KeyedWindow<String>, Long> streamlet = baseStreamlet.setNumPartitions(20)
+        .countByKeyAndWindow(x -> (x > 0) ? "positive" : ((x < 0) ? "negative" : "zero"),
+                             WindowConfig.TumblingCountWindow(10));
+
+    assertTrue(streamlet instanceof CountByKeyAndWindowStreamlet);
+    CountByKeyAndWindowStreamlet<Double, String> mStreamlet =
+        (CountByKeyAndWindowStreamlet<Double, String>) streamlet;
+    assertEquals(20, mStreamlet.getNumPartitions());
+    SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
+    assertEquals(supplierStreamlet.getChildren().size(), 1);
+    assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
+  }
+
   @Test
   @SuppressWarnings("unchecked")
   public void testSimpleBuild() throws Exception {
diff --git a/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala b/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala
index 76de00ca5a..d4afd0d94f 100644
--- a/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala
+++ b/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala
@@ -35,13 +35,17 @@ import org.apache.heron.streamlet.{
 }
 import org.apache.heron.streamlet.impl.streamlets.{
   ConsumerStreamlet,
+  CountByKeyStreamlet,
+  CountByKeyAndWindowStreamlet,
   CustomStreamlet,
   FilterStreamlet,
   FlatMapStreamlet,
+  GeneralReduceByKeyStreamlet,
   LogStreamlet,
   JoinStreamlet,
   KeyByStreamlet,
   MapStreamlet,
+  ReduceByKeyStreamlet,
   ReduceByKeyAndWindowStreamlet,
   RemapStreamlet,
   TransformStreamlet,
@@ -610,7 +614,63 @@ class StreamletImplTest extends BaseFunSuite {
     assertEquals(0, customStreamlet.getChildren.size())
   }
 
-  test("StreamletImpl should support reduce operation") {
+  test("StreamletImpl should support reduce by key operation") {
+    val supplierStreamlet = builder
+      .newSource(() => Random.nextInt(10))
+      .setName("Supplier_Streamlet_1")
+      .setNumPartitions(3)
+
+    supplierStreamlet
+      .reduceByKey[Int, Int]((x: Int) => x * 100,
+                             (x: Int) => x,
+                             (x: Int, y: Int) => x + y)  // sum operation
+      .setName("Reduce_Streamlet_1")
+      .setNumPartitions(5)
+
+    val supplierStreamletImpl =
+      supplierStreamlet.asInstanceOf[StreamletImpl[Int]]
+    assertEquals(1, supplierStreamletImpl.getChildren.size)
+    assertTrue(
+      supplierStreamletImpl
+        .getChildren(0)
+        .isInstanceOf[ReduceByKeyStreamlet[_, _, _]])
+    val reduceStreamlet = supplierStreamletImpl
+      .getChildren(0)
+      .asInstanceOf[ReduceByKeyStreamlet[Int, Int, Int]]
+    assertEquals("Reduce_Streamlet_1", reduceStreamlet.getName)
+    assertEquals(5, reduceStreamlet.getNumPartitions)
+    assertEquals(0, reduceStreamlet.getChildren.size())
+  }
+
+  test("StreamletImpl should support general reduce by key operation") {
+    val supplierStreamlet = builder
+      .newSource(() => Random.nextInt(10))
+      .setName("Supplier_Streamlet_1")
+      .setNumPartitions(3)
+
+    supplierStreamlet
+      .reduceByKey[Int, Int]((key: Int) => key * 100,
+                             0,
+                             (x: Int, y: Int) => x + y)  // sum operation
+      .setName("Reduce_Streamlet_1")
+      .setNumPartitions(5)
+
+    val supplierStreamletImpl =
+      supplierStreamlet.asInstanceOf[StreamletImpl[Int]]
+    assertEquals(1, supplierStreamletImpl.getChildren.size)
+    assertTrue(
+      supplierStreamletImpl
+        .getChildren(0)
+        .isInstanceOf[GeneralReduceByKeyStreamlet[_, _, _]])
+    val reduceStreamlet = supplierStreamletImpl
+      .getChildren(0)
+      .asInstanceOf[GeneralReduceByKeyStreamlet[Int, Int, Int]]
+    assertEquals("Reduce_Streamlet_1", reduceStreamlet.getName)
+    assertEquals(5, reduceStreamlet.getNumPartitions)
+    assertEquals(0, reduceStreamlet.getChildren.size())
+  }
+
+  test("StreamletImpl should support reduce by key and window operation") {
     val supplierStreamlet = builder
       .newSource(() => Random.nextInt(10))
       .setName("Supplier_Streamlet_1")
@@ -631,12 +691,65 @@ class StreamletImplTest extends BaseFunSuite {
       supplierStreamletImpl
         .getChildren(0)
         .isInstanceOf[ReduceByKeyAndWindowStreamlet[_, _, _]])
-    val mapStreamlet = supplierStreamletImpl
+    val reduceStreamlet = supplierStreamletImpl
       .getChildren(0)
       .asInstanceOf[ReduceByKeyAndWindowStreamlet[Int, Int, Int]]
-    assertEquals("Reduce_Streamlet_1", mapStreamlet.getName)
-    assertEquals(5, mapStreamlet.getNumPartitions)
-    assertEquals(0, mapStreamlet.getChildren.size())
+    assertEquals("Reduce_Streamlet_1", reduceStreamlet.getName)
+    assertEquals(5, reduceStreamlet.getNumPartitions)
+    assertEquals(0, reduceStreamlet.getChildren.size())
+  }
+
+  test("StreamletImpl should support count by key operation") {
+    val supplierStreamlet = builder
+      .newSource(() => Random.nextInt(10))
+      .setName("Supplier_Streamlet_1")
+      .setNumPartitions(3)
+
+    supplierStreamlet
+      .countByKey[Int]((x: Int) => x * 100)
+      .setName("Count_Streamlet_1")
+      .setNumPartitions(5)
+
+    val supplierStreamletImpl =
+      supplierStreamlet.asInstanceOf[StreamletImpl[Int]]
+    assertEquals(1, supplierStreamletImpl.getChildren.size)
+    assertTrue(
+      supplierStreamletImpl
+        .getChildren(0)
+        .isInstanceOf[CountByKeyStreamlet[_, _]])
+    val countStreamlet = supplierStreamletImpl
+      .getChildren(0)
+      .asInstanceOf[CountByKeyStreamlet[Int, Int]]
+    assertEquals("Count_Streamlet_1", countStreamlet.getName)
+    assertEquals(5, countStreamlet.getNumPartitions)
+    assertEquals(0, countStreamlet.getChildren.size())
+  }
+
+  test("StreamletImpl should support count by key and window operation") {
+    val supplierStreamlet = builder
+      .newSource(() => Random.nextInt(10))
+      .setName("Supplier_Streamlet_1")
+      .setNumPartitions(3)
+
+    supplierStreamlet
+      .countByKeyAndWindow[Int]((x: Int) => x * 100,
+                                WindowConfig.TumblingCountWindow(10))
+      .setName("Count_Streamlet_1")
+      .setNumPartitions(5)
+
+    val supplierStreamletImpl =
+      supplierStreamlet.asInstanceOf[StreamletImpl[Int]]
+    assertEquals(1, supplierStreamletImpl.getChildren.size)
+    assertTrue(
+      supplierStreamletImpl
+        .getChildren(0)
+        .isInstanceOf[CountByKeyAndWindowStreamlet[_, _]])
+    val countStreamlet = supplierStreamletImpl
+      .getChildren(0)
+      .asInstanceOf[CountByKeyAndWindowStreamlet[Int, Int]]
+    assertEquals("Count_Streamlet_1", countStreamlet.getName)
+    assertEquals(5, countStreamlet.getNumPartitions)
+    assertEquals(0, countStreamlet.getChildren.size())
   }
 
   test("StreamletImpl should support keyBy operation") {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services