You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nl...@apache.org on 2018/10/24 18:28:05 UTC

[incubator-heron] branch master updated: Add interfaces IStreamletOperator, IStreamletBasicOperator and IStrea… (#3051)

This is an automated email from the ASF dual-hosted git repository.

nlu90 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new e292f02  Add interfaces IStreamletOperator, IStreamletBasicOperator and IStrea… (#3051)
e292f02 is described below

commit e292f028411ab8eed803b40cc49fa0a05bcbc79f
Author: Ning Wang <nw...@twitter.com>
AuthorDate: Wed Oct 24 11:27:57 2018 -0700

    Add interfaces IStreamletOperator, IStreamletBasicOperator and IStrea… (#3051)
    
    * Add interfaces IStreamletOperator, IStreamletBasicOperator and IStreamletWindowOperator and support them in Streamlet
    
    * Fix HiddenField
---
 .../heron/streamlet/IStreamletBasicOperator.java   |  30 ++++
 .../apache/heron/streamlet/IStreamletOperator.java |  30 ++++
 .../heron/streamlet/IStreamletWindowOperator.java  |  30 ++++
 .../java/org/apache/heron/streamlet/Streamlet.java |  26 +++-
 .../apache/heron/streamlet/impl/StreamletImpl.java |  49 +++++++
 .../streamlet/impl/operators/FilterOperator.java   |   2 +-
 .../streamlet/impl/operators/FlatMapOperator.java  |   2 +-
 .../GeneralReduceByKeyAndWindowOperator.java       |   2 +-
 .../streamlet/impl/operators/JoinOperator.java     |   2 +-
 .../streamlet/impl/operators/MapOperator.java      |   2 +-
 .../operators/ReduceByKeyAndWindowOperator.java    |   2 +-
 .../impl/operators/StreamletOperator.java          |   5 +-
 .../impl/operators/StreamletWindowOperator.java    |   5 +-
 .../impl/operators/TransformOperator.java          |   2 +-
 .../streamlet/impl/operators/UnionOperator.java    |   2 +-
 .../heron/streamlet/impl/sinks/ComplexSink.java    |   2 +-
 .../heron/streamlet/impl/sinks/ConsumerSink.java   |   2 +-
 .../apache/heron/streamlet/impl/sinks/LogSink.java |   2 +-
 .../impl/streamlets/CustomBasicStreamlet.java      |  55 +++++++
 .../streamlet/impl/streamlets/CustomStreamlet.java |  55 +++++++
 .../impl/streamlets/CustomWindowStreamlet.java     |  55 +++++++
 .../apache/heron/streamlet/scala/Streamlet.scala   |  27 ++++
 .../heron/streamlet/scala/impl/StreamletImpl.scala |  36 +++++
 .../org/apache/heron/resource/TestBasicBolt.java}  |  38 ++---
 .../java/org/apache/heron/resource/TestBolt.java}  |  41 +++---
 .../org/apache/heron/resource/TestWindowBolt.java} |  42 +++---
 .../heron/streamlet/impl/StreamletImplTest.java    |  64 ++++++++
 heron/api/tests/scala/BUILD                        |   5 +-
 .../org/apache/heron/resource/TestBasicBolt.scala  |  48 ++++++
 .../scala/org/apache/heron/resource/TestBolt.scala |  53 +++++++
 .../org/apache/heron/resource/TestWindowBolt.scala |  52 +++++++
 .../streamlet/scala/impl/StreamletImplTest.scala   | 162 ++++++++++++++++++++-
 32 files changed, 858 insertions(+), 72 deletions(-)

diff --git a/heron/api/src/java/org/apache/heron/streamlet/IStreamletBasicOperator.java b/heron/api/src/java/org/apache/heron/streamlet/IStreamletBasicOperator.java
new file mode 100644
index 0000000..e6b615a
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/IStreamletBasicOperator.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.heron.streamlet;
+
+import org.apache.heron.api.bolt.IBasicBolt;
+
+/**
+ * The interface for streamlet operators. It can be used to create
+ * operators based on existing Bolts (subclasses of IBasicBolt).
+ */
+public interface IStreamletBasicOperator<R, T> extends IBasicBolt {
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java b/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java
new file mode 100644
index 0000000..24b989c
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.heron.streamlet;
+
+import org.apache.heron.api.bolt.IRichBolt;
+
+/**
+ * The interface for custom operators: it can be used to create
+ * operators based on existing Bolts (subclasses of IRichBolt).
+ */
+public interface IStreamletOperator<R, T> extends IRichBolt {
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/IStreamletWindowOperator.java b/heron/api/src/java/org/apache/heron/streamlet/IStreamletWindowOperator.java
new file mode 100644
index 0000000..7369fca
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/IStreamletWindowOperator.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.heron.streamlet;
+
+import org.apache.heron.api.bolt.IWindowedBolt;
+
+/**
+ * The interface for streamlet operators. It can be used to create
+ * operators based on existing Bolts (subclasses of IWindowedBolt).
+ */
+public interface IStreamletWindowOperator<R, T> extends IWindowedBolt {
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
index 82dbef5..2490537 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
@@ -191,7 +191,7 @@ public interface Streamlet<R> {
   Streamlet<R> union(Streamlet<? extends R> other);
 
   /**
-   * Returns a  new Streamlet by applying the transformFunction on each element of this streamlet.
+   * Returns a new Streamlet by applying the transformFunction on each element of this streamlet.
    * Before starting to cycle the transformFunction over the Streamlet, the open function is called.
    * This allows the transform Function to do any kind of initialization/loading, etc.
    * @param serializableTransformer The transformation function to be applied
@@ -202,6 +202,30 @@ public interface Streamlet<R> {
       SerializableTransformer<R, ? extends T> serializableTransformer);
 
   /**
+   * Returns a new Streamlet by applying the operator on each element of this streamlet.
+   * @param operator The operator to be applied
+   * @param <T> The return type of the transform
+   * @return Streamlet containing the output of the operation
+   */
+  <T> Streamlet<T> applyOperator(IStreamletOperator<R, T> operator);
+
+  /**
+   * Returns a new Streamlet by applying the operator on each element of this streamlet.
+   * @param operator The operator to be applied
+   * @param <T> The return type of the transform
+   * @return Streamlet containing the output of the operation
+   */
+  <T> Streamlet<T> applyOperator(IStreamletBasicOperator<R, T> operator);
+
+  /**
+   * Returns a new Streamlet by applying the operator on each element of this streamlet.
+   * @param operator The operator to be applied
+   * @param <T> The return type of the transform
+   * @return Streamlet containing the output of the operation
+   */
+  <T> Streamlet<T> applyOperator(IStreamletWindowOperator<R, T> operator);
+
+  /**
    * Logs every element of the streamlet using String.valueOf function
    * This is one of the sink functions in the sense that this operation returns void
    */
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
index fb5b950..677b34e 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
@@ -27,6 +27,9 @@ import java.util.Set;
 import java.util.logging.Logger;
 
 import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.IStreamletBasicOperator;
+import org.apache.heron.streamlet.IStreamletOperator;
+import org.apache.heron.streamlet.IStreamletWindowOperator;
 import org.apache.heron.streamlet.JoinType;
 import org.apache.heron.streamlet.KeyValue;
 import org.apache.heron.streamlet.KeyedWindow;
@@ -42,6 +45,9 @@ import org.apache.heron.streamlet.Source;
 import org.apache.heron.streamlet.Streamlet;
 import org.apache.heron.streamlet.WindowConfig;
 import org.apache.heron.streamlet.impl.streamlets.ConsumerStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.CustomBasicStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.CustomStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.CustomWindowStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.FilterStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.FlatMapStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.GeneralReduceByKeyAndWindowStreamlet;
@@ -100,6 +106,9 @@ public abstract class StreamletImpl<R> implements Streamlet<R> {
 
   protected enum StreamletNamePrefix {
     CONSUMER("consumer"),
+    CUSTOM("custom"),
+    CUSTOM_BASIC("customBasic"),
+    CUSTOM_WINDOW("customWindow"),
     FILTER("filter"),
     FLATMAP("flatmap"),
     REDUCE("reduceByKeyAndWindow"),
@@ -484,6 +493,46 @@ public abstract class StreamletImpl<R> implements Streamlet<R> {
   }
 
   /**
+   * Returns a new Streamlet by applying the operator on each element of this streamlet.
+   * @param operator The operator to be applied
+   * @param <T> The return type of the transform
+   * @return Streamlet containing the output of the operation
+   */
+  @Override
+  public <T> Streamlet<T> applyOperator(IStreamletOperator<R, T> operator) {
+    CustomStreamlet<R, T> customStreamlet = new CustomStreamlet<>(this, operator);
+    addChild(customStreamlet);
+    return customStreamlet;
+  }
+
+  /**
+   * Returns a new Streamlet by applying the operator on each element of this streamlet.
+   * @param operator The operator to be applied
+   * @param <T> The return type of the transform
+   * @return Streamlet containing the output of the operation
+   */
+  @Override
+  public <T> Streamlet<T> applyOperator(IStreamletBasicOperator<R, T> operator) {
+    CustomBasicStreamlet<R, T> customStreamlet = new CustomBasicStreamlet<>(this, operator);
+    addChild(customStreamlet);
+    return customStreamlet;
+  }
+
+  /**
+   * Returns a new Streamlet by applying the operator on each element of this streamlet.
+   * @param operator The operator to be applied
+   * @param <T> The return type of the transform
+   * @return Streamlet containing the output of the operation
+   */
+  @Override
+  public <T> Streamlet<T> applyOperator(IStreamletWindowOperator<R, T> operator) {
+    CustomWindowStreamlet<R, T> customStreamlet =
+        new CustomWindowStreamlet<>(this, operator);
+    addChild(customStreamlet);
+    return customStreamlet;
+  }
+
+  /**
    * Verifies the requirement as the utility function.
    * @param requirement The requirement to verify
    * @param errorMessage The error message
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FilterOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FilterOperator.java
index 9b42ee6..af9afe8 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FilterOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FilterOperator.java
@@ -34,7 +34,7 @@ import org.apache.heron.streamlet.SerializablePredicate;
  * For every tuple that it encounters, the filter function is run
  * and the tuple is re-emitted if the predicate evaluates to true
  */
-public class FilterOperator<R> extends StreamletOperator {
+public class FilterOperator<R> extends StreamletOperator<R, R> {
   private static final long serialVersionUID = -4748646871471052706L;
   private SerializablePredicate<? super R> filterFn;
 
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FlatMapOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FlatMapOperator.java
index 4ba3a7f..35be0cf 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FlatMapOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FlatMapOperator.java
@@ -34,7 +34,7 @@ import org.apache.heron.streamlet.SerializableFunction;
  * For every tuple, it applies the flatMapFunction, flattens the resulting
  * tuples and emits them
  */
-public class FlatMapOperator<R, T> extends StreamletOperator {
+public class FlatMapOperator<R, T> extends StreamletOperator<R, T> {
   private static final long serialVersionUID = -2418329215159618998L;
   private SerializableFunction<? super R, ? extends Iterable<? extends T>> flatMapFn;
 
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java
index f37ebeb..7b9f71f 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java
@@ -40,7 +40,7 @@ import org.apache.heron.streamlet.Window;
  * For every time window, the bolt goes over all the tuples in that window and applies the reduce
  * function grouped by keys. It emits a KeyedWindow, reduced Value KeyPairs as outputs
  */
-public class GeneralReduceByKeyAndWindowOperator<K, V, VR> extends StreamletWindowOperator {
+public class GeneralReduceByKeyAndWindowOperator<K, V, VR> extends StreamletWindowOperator<V, V> {
   private static final long serialVersionUID = 2833576046687752396L;
   private SerializableFunction<V, K> keyExtractor;
   private VR identity;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/JoinOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/JoinOperator.java
index 1aa3138..00c8d80 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/JoinOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/JoinOperator.java
@@ -44,7 +44,7 @@ import org.apache.heron.streamlet.Window;
  * a config parameter. Also taken as parameters are which source is left and right.
  * This is needed for the semantics of outer/left/inner joins.
  */
-public class JoinOperator<K, V1, V2, VR> extends StreamletWindowOperator {
+public class JoinOperator<K, V1, V2, VR> extends StreamletWindowOperator<V1, VR> {
   private static final long serialVersionUID = 4875450390444745407L;
   private static final String LEFT_COMPONENT_NAME = "_streamlet_joinbolt_left_component_name_";
   private static final String RIGHT_COMPONENT_NAME = "_streamlet_joinbolt_right_component_name_";
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/MapOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/MapOperator.java
index b05e6f1..7ee9be1 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/MapOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/MapOperator.java
@@ -33,7 +33,7 @@ import org.apache.heron.streamlet.SerializableFunction;
  * It takes in the mapFunction Function as the input.
  * For every tuple, it applies the mapFunction, and emits the resulting value
  */
-public class MapOperator<R, T> extends StreamletOperator {
+public class MapOperator<R, T> extends StreamletOperator<R, T> {
   private static final long serialVersionUID = -1303096133107278700L;
   private SerializableFunction<? super R, ? extends T> mapFn;
 
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperator.java
index cf6dc38..b0155df 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperator.java
@@ -40,7 +40,7 @@ import org.apache.heron.streamlet.Window;
  * For every time window, the bolt goes over all the tuples in that window and applies the reduce
  * function grouped by keys. It emits a KeyedWindow, reduced Value KeyPairs as outputs
  */
-public class ReduceByKeyAndWindowOperator<K, V, R> extends StreamletWindowOperator {
+public class ReduceByKeyAndWindowOperator<K, V, R> extends StreamletWindowOperator<R, V> {
   private static final long serialVersionUID = 2833576046687750496L;
   private SerializableFunction<R, K> keyExtractor;
   private SerializableFunction<R, V> valueExtractor;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
index 27e6cd0..e2b3df6 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
@@ -23,12 +23,15 @@ package org.apache.heron.streamlet.impl.operators;
 import org.apache.heron.api.bolt.BaseRichBolt;
 import org.apache.heron.api.topology.OutputFieldsDeclarer;
 import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.streamlet.IStreamletOperator;
 
 /**
  * The Bolt interface that other operators of the streamlet packages extend.
  * The only common stuff amongst all of them is the output streams
  */
-public abstract class StreamletOperator extends BaseRichBolt {
+public abstract class StreamletOperator<R, T>
+    extends BaseRichBolt
+    implements IStreamletOperator<R, T> {
   private static final long serialVersionUID = 8524238140745238942L;
   private static final String OUTPUT_FIELD_NAME = "output";
 
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletWindowOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletWindowOperator.java
index e9ff1ba..f90655e 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletWindowOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletWindowOperator.java
@@ -23,12 +23,15 @@ package org.apache.heron.streamlet.impl.operators;
 import org.apache.heron.api.bolt.BaseWindowedBolt;
 import org.apache.heron.api.topology.OutputFieldsDeclarer;
 import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.streamlet.IStreamletWindowOperator;
 
 /**
  * The Bolt interface that other windowed operators of the streamlet packages extend.
  * The only common stuff amongst all of them is the output streams
  */
-public abstract class StreamletWindowOperator extends BaseWindowedBolt {
+public abstract class StreamletWindowOperator<R, T>
+    extends BaseWindowedBolt
+    implements IStreamletWindowOperator<R, T> {
   private static final long serialVersionUID = -4836560876041237959L;
   private static final String OUTPUT_FIELD_NAME = "output";
 
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java
index e588f8b..0bf5232 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java
@@ -39,7 +39,7 @@ import org.apache.heron.streamlet.impl.ContextImpl;
  * It calls the transformFunction setup/cleanup at the beginning/end of the
  * processing. And for every tuple, it applies the transformFunction, and emits the resulting value
  */
-public class TransformOperator<R, T> extends StreamletOperator
+public class TransformOperator<R, T> extends StreamletOperator<R, T>
     implements IStatefulComponent<Serializable, Serializable> {
   private static final long serialVersionUID = 429297144878185182L;
   private SerializableTransformer<? super R, ? extends T> serializableTransformer;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java
index f061eae..151c0b4 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java
@@ -31,7 +31,7 @@ import org.apache.heron.api.tuple.Values;
  * UnionOperator is the class that implements the union functionality.
  * Its a very simple bolt that re-emits every tuple that it sees.
  */
-public class UnionOperator<I> extends StreamletOperator {
+public class UnionOperator<I> extends StreamletOperator<I, I> {
   private static final long serialVersionUID = -7326832064961413315L;
   private OutputCollector collector;
 
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ComplexSink.java b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ComplexSink.java
index ab4dbc4..943e3c5 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ComplexSink.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ComplexSink.java
@@ -37,7 +37,7 @@ import org.apache.heron.streamlet.impl.operators.StreamletOperator;
  * ConsumerSink is a very simple Sink that basically invokes a user supplied
  * consume function for every tuple.
  */
-public class ComplexSink<R> extends StreamletOperator
+public class ComplexSink<R> extends StreamletOperator<R, R>
     implements IStatefulComponent<Serializable, Serializable> {
   private static final long serialVersionUID = 8717991188885786658L;
   private Sink<R> sink;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ConsumerSink.java b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ConsumerSink.java
index 687e944..af94704 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ConsumerSink.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ConsumerSink.java
@@ -32,7 +32,7 @@ import org.apache.heron.streamlet.impl.operators.StreamletOperator;
  * ConsumerSink is a very simple Sink that basically invokes a user supplied
  * consume function for every tuple.
  */
-public class ConsumerSink<R> extends StreamletOperator {
+public class ConsumerSink<R> extends StreamletOperator<R, R> {
   private static final long serialVersionUID = 8716140142187667638L;
   private SerializableConsumer<R> consumer;
   private OutputCollector collector;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/LogSink.java b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/LogSink.java
index 37c36f4..7fdc3c9 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/LogSink.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/LogSink.java
@@ -32,7 +32,7 @@ import org.apache.heron.streamlet.impl.operators.StreamletOperator;
  * LogSink is a very simple Bolt that implements the log functionality.
  * It basically logs every tuple.
  */
-public class LogSink<R> extends StreamletOperator {
+public class LogSink<R> extends StreamletOperator<R, R> {
   private static final long serialVersionUID = -6392422646613189818L;
   private static final Logger LOG = Logger.getLogger(LogSink.class.getName());
   private OutputCollector collector;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomBasicStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomBasicStreamlet.java
new file mode 100644
index 0000000..7d4ae37
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomBasicStreamlet.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.heron.streamlet.impl.streamlets;
+
+import java.util.Set;
+
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.IStreamletBasicOperator;
+import org.apache.heron.streamlet.impl.StreamletImpl;
+
+/**
+ * CustomBasicStreamlet represents a Streamlet that is made up of applying the user
+ * supplied custom operator to each element of the parent streamlet.
+ */
+public class CustomBasicStreamlet<R, T> extends StreamletImpl<T> {
+  private StreamletImpl<R> parent;
+  private IStreamletBasicOperator<R, T> op;
+
+  /**
+   * Create a custom streamlet from user defined CustomBasicOperator object.
+   * @param parent The parent(upstream) streamlet object
+   * @param op The user defined CustomeBasicOperator
+   */
+  public CustomBasicStreamlet(StreamletImpl<R> parent,
+                              IStreamletBasicOperator<R, T> op) {
+    this.parent = parent;
+    this.op = op;
+    setNumPartitions(parent.getNumPartitions());
+  }
+
+  @Override
+  public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
+    setDefaultNameIfNone(StreamletNamePrefix.CUSTOM_BASIC, stageNames);
+    bldr.setBolt(getName(), op,  getNumPartitions()).shuffleGrouping(parent.getName());
+    return true;
+  }
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
new file mode 100644
index 0000000..25258d2
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.heron.streamlet.impl.streamlets;
+
+import java.util.Set;
+
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.IStreamletOperator;
+import org.apache.heron.streamlet.impl.StreamletImpl;
+
+/**
+ * CustomStreamlet represents a Streamlet that is made up of applying the user
+ * supplied custom operator to each element of the parent streamlet.
+ */
+public class CustomStreamlet<R, T> extends StreamletImpl<T> {
+  private StreamletImpl<R> parent;
+  private IStreamletOperator<R, T> op;
+
+  /**
+   * Create a custom streamlet from user defined CustomOperator object.
+   * @param parent The parent(upstream) streamlet object
+   * @param op The user defined CustomeOperator
+   */
+  public CustomStreamlet(StreamletImpl<R> parent,
+                         IStreamletOperator<R, T> op) {
+    this.parent = parent;
+    this.op = op;
+    setNumPartitions(parent.getNumPartitions());
+  }
+
+  @Override
+  public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
+    setDefaultNameIfNone(StreamletNamePrefix.CUSTOM, stageNames);
+    bldr.setBolt(getName(), op,  getNumPartitions()).shuffleGrouping(parent.getName());
+    return true;
+  }
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomWindowStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomWindowStreamlet.java
new file mode 100644
index 0000000..8f05aa1
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomWindowStreamlet.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.heron.streamlet.impl.streamlets;
+
+import java.util.Set;
+
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.IStreamletWindowOperator;
+import org.apache.heron.streamlet.impl.StreamletImpl;
+
+/**
+ * CustomWindowOperator represents a Streamlet that is made up of applying the user
+ * supplied custom operator to each element of the parent streamlet.
+ */
+public class CustomWindowStreamlet<R, T> extends StreamletImpl<T> {
+  private StreamletImpl<R> parent;
+  private IStreamletWindowOperator<R, T> op;
+
+  /**
+   * Create a custom streamlet from user defined CustomWindowOperator object.
+   * @param parent The parent(upstream) streamlet object
+   * @param op The user defined CustomeWindowOperator
+   */
+  public CustomWindowStreamlet(StreamletImpl<R> parent,
+                              IStreamletWindowOperator<R, T> op) {
+    this.parent = parent;
+    this.op = op;
+    setNumPartitions(parent.getNumPartitions());
+  }
+
+  @Override
+  public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
+    setDefaultNameIfNone(StreamletNamePrefix.CUSTOM_WINDOW, stageNames);
+    bldr.setBolt(getName(), op,  getNumPartitions()).shuffleGrouping(parent.getName());
+    return true;
+  }
+}
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
index 88dc5c5..b2dc4ca 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
@@ -19,6 +19,9 @@
 package org.apache.heron.streamlet.scala
 
 import org.apache.heron.streamlet.{
+  IStreamletBasicOperator,
+  IStreamletOperator,
+  IStreamletWindowOperator,
   JoinType,
   KeyValue,
   KeyedWindow,
@@ -222,6 +225,30 @@ trait Streamlet[R] {
   def transform[T](
       serializableTransformer: SerializableTransformer[R, _ <: T]): Streamlet[T]
 
+/**
+   * Returns a new Streamlet by applying the operator on each element of this streamlet.
+   * @param operator The operator to be applied
+   * @param <T> The return type of the transform
+   * @return Streamlet containing the output of the operation
+   */
+  def applyOperator[T](operator: IStreamletOperator[R, T]): Streamlet[T]
+
+  /**
+   * Returns a new Streamlet by applying the operator on each element of this streamlet.
+   * @param operator The operator to be applied
+   * @param <T> The return type of the transform
+   * @return Streamlet containing the output of the operation
+   */
+  def applyOperator[T](operator: IStreamletBasicOperator[R, T]): Streamlet[T]
+
+  /**
+   * Returns a new Streamlet by applying the operator on each element of this streamlet.
+   * @param operator The operator to be applied
+   * @param <T> The return type of the transform
+   * @return Streamlet containing the output of the operation
+   */
+  def applyOperator[T](operator: IStreamletWindowOperator[R, T]): Streamlet[T]
+
   /**
     * Logs every element of the streamlet using String.valueOf function
     * This is one of the sink functions in the sense that this operation returns void
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
index 6d780bd..bb1b28a 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
@@ -21,6 +21,9 @@ package org.apache.heron.streamlet.scala.impl
 import scala.collection.JavaConverters
 
 import org.apache.heron.streamlet.{
+  IStreamletBasicOperator,
+  IStreamletOperator,
+  IStreamletWindowOperator,
   JoinType,
   KeyValue,
   KeyedWindow,
@@ -319,6 +322,39 @@ class StreamletImpl[R](val javaStreamlet: JavaStreamlet[R])
   }
 
   /**
+   * Returns a new Streamlet by applying the operator on each element of this streamlet.
+   * @param operator The operator to be applied
+   * @param <T> The return type of the transform
+   * @return Streamlet containing the output of the operation
+   */
+  override def applyOperator[T](operator: IStreamletOperator[R, T]): Streamlet[T] = {
+    val newJavaStreamlet = javaStreamlet.applyOperator[T](operator)
+    fromJavaStreamlet(newJavaStreamlet)
+  }
+
+  /**
+   * Returns a new Streamlet by applying the operator on each element of this streamlet.
+   * @param operator The operator to be applied
+   * @param <T> The return type of the transform
+   * @return Streamlet containing the output of the operation
+   */
+  override def applyOperator[T](operator: IStreamletBasicOperator[R, T]): Streamlet[T] = {
+    val newJavaStreamlet = javaStreamlet.applyOperator[T](operator)
+    fromJavaStreamlet(newJavaStreamlet)
+  }
+
+  /**
+   * Returns a new Streamlet by applying the operator on each element of this streamlet.
+   * @param operator The operator to be applied
+   * @param <T> The return type of the transform
+   * @return Streamlet containing the output of the operation
+   */
+  override def applyOperator[T](operator: IStreamletWindowOperator[R, T]): Streamlet[T] = {
+    val newJavaStreamlet = javaStreamlet.applyOperator[T](operator)
+    fromJavaStreamlet(newJavaStreamlet)
+  }
+
+  /**
     * Logs every element of the streamlet using String.valueOf function
     * This is one of the sink functions in the sense that this operation returns void
     */
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java b/heron/api/tests/java/org/apache/heron/resource/TestBasicBolt.java
similarity index 56%
copy from heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java
copy to heron/api/tests/java/org/apache/heron/resource/TestBasicBolt.java
index f061eae..a316eb1 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java
+++ b/heron/api/tests/java/org/apache/heron/resource/TestBasicBolt.java
@@ -17,38 +17,38 @@
  * under the License.
  */
 
-
-package org.apache.heron.streamlet.impl.operators;
+package org.apache.heron.resource;
 
 import java.util.Map;
 
-import org.apache.heron.api.bolt.OutputCollector;
+import org.apache.heron.api.bolt.BaseBasicBolt;
+import org.apache.heron.api.bolt.BasicOutputCollector;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
 import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Fields;
 import org.apache.heron.api.tuple.Tuple;
 import org.apache.heron.api.tuple.Values;
 
-/**
- * UnionOperator is the class that implements the union functionality.
- * Its a very simple bolt that re-emits every tuple that it sees.
- */
-public class UnionOperator<I> extends StreamletOperator {
-  private static final long serialVersionUID = -7326832064961413315L;
-  private OutputCollector collector;
+public class TestBasicBolt extends BaseBasicBolt {
+  private int tupleExecuted;
 
-  public UnionOperator() {
+  @Override
+  public void prepare(Map<String, Object> heronConf, TopologyContext context) {
+    this.tupleExecuted = 0;
   }
 
-  @SuppressWarnings("rawtypes")
   @Override
-  public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
-    collector = outputCollector;
+  public void execute(Tuple input, BasicOutputCollector collector) {
+    tupleExecuted++;
+    collector.emit(new Values(input.getString(0)));
   }
 
-  @SuppressWarnings("unchecked")
   @Override
-  public void execute(Tuple tuple) {
-    I obj = (I) tuple.getValue(0);
-    collector.emit(new Values(obj));
-    collector.ack(tuple);
+  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+    outputFieldsDeclarer.declare(new Fields("output"));
+  }
+
+  public int getExecuted() {
+    return tupleExecuted;
   }
 }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java b/heron/api/tests/java/org/apache/heron/resource/TestBolt.java
similarity index 58%
copy from heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java
copy to heron/api/tests/java/org/apache/heron/resource/TestBolt.java
index f061eae..b01ff29 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java
+++ b/heron/api/tests/java/org/apache/heron/resource/TestBolt.java
@@ -17,38 +17,43 @@
  * under the License.
  */
 
-
-package org.apache.heron.streamlet.impl.operators;
+package org.apache.heron.resource;
 
 import java.util.Map;
 
+import org.apache.heron.api.bolt.BaseRichBolt;
 import org.apache.heron.api.bolt.OutputCollector;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
 import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Fields;
 import org.apache.heron.api.tuple.Tuple;
 import org.apache.heron.api.tuple.Values;
 
-/**
- * UnionOperator is the class that implements the union functionality.
- * Its a very simple bolt that re-emits every tuple that it sees.
- */
-public class UnionOperator<I> extends StreamletOperator {
-  private static final long serialVersionUID = -7326832064961413315L;
-  private OutputCollector collector;
+public class TestBolt extends BaseRichBolt {
+  private OutputCollector outputCollector;
+  private int tupleExecuted;
 
-  public UnionOperator() {
+  @Override
+  public void prepare(
+      Map<String, Object> map,
+      TopologyContext topologyContext,
+      OutputCollector collector) {
+    this.outputCollector = collector;
+    this.tupleExecuted = 0;
   }
 
-  @SuppressWarnings("rawtypes")
   @Override
-  public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
-    collector = outputCollector;
+  public void execute(Tuple tuple) {
+    tupleExecuted++;
+    outputCollector.emit(new Values(tuple.getString(0)));
   }
 
-  @SuppressWarnings("unchecked")
   @Override
-  public void execute(Tuple tuple) {
-    I obj = (I) tuple.getValue(0);
-    collector.emit(new Values(obj));
-    collector.ack(tuple);
+  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+    outputFieldsDeclarer.declare(new Fields("output"));
+  }
+
+  public int getExecuted() {
+    return tupleExecuted;
   }
 }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java b/heron/api/tests/java/org/apache/heron/resource/TestWindowBolt.java
similarity index 53%
copy from heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java
copy to heron/api/tests/java/org/apache/heron/resource/TestWindowBolt.java
index f061eae..099f394 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java
+++ b/heron/api/tests/java/org/apache/heron/resource/TestWindowBolt.java
@@ -17,38 +17,42 @@
  * under the License.
  */
 
-
-package org.apache.heron.streamlet.impl.operators;
+package org.apache.heron.resource;
 
 import java.util.Map;
 
+import org.apache.heron.api.bolt.BaseWindowedBolt;
 import org.apache.heron.api.bolt.OutputCollector;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
 import org.apache.heron.api.topology.TopologyContext;
-import org.apache.heron.api.tuple.Tuple;
+import org.apache.heron.api.tuple.Fields;
 import org.apache.heron.api.tuple.Values;
+import org.apache.heron.api.windowing.TupleWindow;
 
-/**
- * UnionOperator is the class that implements the union functionality.
- * Its a very simple bolt that re-emits every tuple that it sees.
- */
-public class UnionOperator<I> extends StreamletOperator {
-  private static final long serialVersionUID = -7326832064961413315L;
-  private OutputCollector collector;
+public class TestWindowBolt extends BaseWindowedBolt {
+  private OutputCollector outputCollector;
+  private int tupleExecuted;
 
-  public UnionOperator() {
+  @Override
+  public void prepare(Map<String, Object> topoConf, TopologyContext context,
+                      OutputCollector collector) {
+    this.outputCollector = collector;
+    this.tupleExecuted = 0;
   }
 
-  @SuppressWarnings("rawtypes")
   @Override
-  public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
-    collector = outputCollector;
+  public void execute(TupleWindow inputWindow) {
+    tupleExecuted++;
+    outputCollector.emit(new Values(inputWindow.get().size()));
   }
 
-  @SuppressWarnings("unchecked")
   @Override
-  public void execute(Tuple tuple) {
-    I obj = (I) tuple.getValue(0);
-    collector.emit(new Values(obj));
-    collector.ack(tuple);
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declare(new Fields("count"));
+  }
+
+  public int getExecuted() {
+    return tupleExecuted;
   }
 }
+
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
index cec9b45..87af426 100644
--- a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
+++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
@@ -30,13 +30,22 @@ import org.junit.Test;
 
 import org.apache.heron.api.topology.TopologyBuilder;
 import org.apache.heron.common.basics.ByteAmount;
+import org.apache.heron.resource.TestBasicBolt;
+import org.apache.heron.resource.TestBolt;
+import org.apache.heron.resource.TestWindowBolt;
 import org.apache.heron.streamlet.Config;
 import org.apache.heron.streamlet.Context;
+import org.apache.heron.streamlet.IStreamletBasicOperator;
+import org.apache.heron.streamlet.IStreamletOperator;
+import org.apache.heron.streamlet.IStreamletWindowOperator;
 import org.apache.heron.streamlet.SerializableConsumer;
 import org.apache.heron.streamlet.SerializableTransformer;
 import org.apache.heron.streamlet.Streamlet;
 import org.apache.heron.streamlet.WindowConfig;
 import org.apache.heron.streamlet.impl.streamlets.ConsumerStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.CustomBasicStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.CustomStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.CustomWindowStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.FilterStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.FlatMapStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.JoinStreamlet;
@@ -188,6 +197,61 @@ public class StreamletImplTest {
     assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
   }
 
+  private class MyBoltOperator extends TestBolt implements IStreamletOperator<Double, Double> {
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testCustomStreamletFromBolt() throws Exception {
+    Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+    Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20)
+                                               .applyOperator(new MyBoltOperator());
+    assertTrue(streamlet instanceof CustomStreamlet);
+    CustomStreamlet<Double, Double> mStreamlet = (CustomStreamlet<Double, Double>) streamlet;
+    assertEquals(20, mStreamlet.getNumPartitions());
+    SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
+    assertEquals(supplierStreamlet.getChildren().size(), 1);
+    assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
+  }
+
+  private class MyBasicBoltOperator extends TestBasicBolt
+      implements IStreamletBasicOperator<Double, Double> {
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testCustomStreamletFromBasicBolt() throws Exception {
+    Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+    Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20)
+                                               .applyOperator(new MyBasicBoltOperator());
+    assertTrue(streamlet instanceof CustomBasicStreamlet);
+    CustomBasicStreamlet<Double, Double> mStreamlet =
+        (CustomBasicStreamlet<Double, Double>) streamlet;
+    assertEquals(20, mStreamlet.getNumPartitions());
+    SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
+    assertEquals(supplierStreamlet.getChildren().size(), 1);
+    assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
+  }
+
+  private class MyWindowBoltOperator extends TestWindowBolt
+      implements IStreamletWindowOperator<Double, Double> {
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testCustomStreamletFromWindowBolt() throws Exception {
+    Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+    Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20)
+                                               .applyOperator(new MyWindowBoltOperator());
+    assertTrue(streamlet instanceof CustomWindowStreamlet);
+    CustomWindowStreamlet<Double, Double> mStreamlet =
+        (CustomWindowStreamlet<Double, Double>) streamlet;
+    assertEquals(20, mStreamlet.getNumPartitions());
+    SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
+    assertEquals(supplierStreamlet.getChildren().size(), 1);
+    assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
+  }
+
   @Test
   @SuppressWarnings("unchecked")
   public void testSimpleBuild() throws Exception {
diff --git a/heron/api/tests/scala/BUILD b/heron/api/tests/scala/BUILD
index 61ce328..cb533f4 100644
--- a/heron/api/tests/scala/BUILD
+++ b/heron/api/tests/scala/BUILD
@@ -1,6 +1,9 @@
 scala_test(
     name = "api-scala-test",
-    srcs = glob(["org/apache/heron/streamlet/scala/**/*.scala"]),
+    srcs = glob([
+        "org/apache/heron/streamlet/scala/**/*.scala",
+        "org/apache/heron/resource/**/*.scala"
+    ]),
     deps = [
         "//third_party/java:junit4",
         "//heron/api/src/scala:api-scala",
diff --git a/heron/api/tests/scala/org/apache/heron/resource/TestBasicBolt.scala b/heron/api/tests/scala/org/apache/heron/resource/TestBasicBolt.scala
new file mode 100644
index 0000000..53ceac8
--- /dev/null
+++ b/heron/api/tests/scala/org/apache/heron/resource/TestBasicBolt.scala
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.resource
+
+import java.util.{Map => JMap}
+
+import org.apache.heron.api.bolt.BaseBasicBolt
+import org.apache.heron.api.bolt.BasicOutputCollector
+import org.apache.heron.api.topology.OutputFieldsDeclarer
+import org.apache.heron.api.topology.TopologyContext
+import org.apache.heron.api.tuple.Fields
+import org.apache.heron.api.tuple.Tuple
+import org.apache.heron.api.tuple.Values
+
+class TestBasicBolt extends BaseBasicBolt {
+  var tupleExecuted: Int = 0
+  
+  override def prepare(heronConf: JMap[String, Object], context: TopologyContext): Unit = {
+  }
+
+  override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
+    tupleExecuted = tupleExecuted + 1
+    val v = input.getDouble(0)
+    collector.emit(new Values(v))
+  }
+
+  
+  override def declareOutputFields(outputFieldsDeclarer: OutputFieldsDeclarer): Unit = {
+    outputFieldsDeclarer.declare(new Fields("output"))
+  }
+}
diff --git a/heron/api/tests/scala/org/apache/heron/resource/TestBolt.scala b/heron/api/tests/scala/org/apache/heron/resource/TestBolt.scala
new file mode 100644
index 0000000..05b0ab3
--- /dev/null
+++ b/heron/api/tests/scala/org/apache/heron/resource/TestBolt.scala
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.resource
+
+import java.util.{Map => JMap}
+
+import org.apache.heron.api.bolt.BaseRichBolt
+import org.apache.heron.api.bolt.OutputCollector
+import org.apache.heron.api.topology.OutputFieldsDeclarer
+import org.apache.heron.api.topology.TopologyContext
+import org.apache.heron.api.tuple.Fields
+import org.apache.heron.api.tuple.Tuple
+import org.apache.heron.api.tuple.Values
+
+class TestBolt extends BaseRichBolt {
+  var outputCollector: OutputCollector = _
+  var tupleExecuted: Int = 0
+
+  override def prepare(
+      conf: JMap[String, Object],
+      topologyContext: TopologyContext,
+      collector: OutputCollector): Unit = {
+    outputCollector = collector
+  }
+
+  
+  override def execute(tuple: Tuple): Unit = {
+    tupleExecuted = tupleExecuted + 1
+    val v = tuple.getDouble(0)
+    outputCollector.emit(new Values(v))
+  }
+
+  override def declareOutputFields(outputFieldsDeclarer: OutputFieldsDeclarer): Unit = {
+    outputFieldsDeclarer.declare(new Fields("output"))
+  }
+}
diff --git a/heron/api/tests/scala/org/apache/heron/resource/TestWindowBolt.scala b/heron/api/tests/scala/org/apache/heron/resource/TestWindowBolt.scala
new file mode 100644
index 0000000..863b14d
--- /dev/null
+++ b/heron/api/tests/scala/org/apache/heron/resource/TestWindowBolt.scala
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.resource
+
+import java.lang.{Double => JDouble}
+import java.util.{Map => JMap}
+
+import org.apache.heron.api.bolt.BaseWindowedBolt
+import org.apache.heron.api.bolt.OutputCollector
+import org.apache.heron.api.topology.OutputFieldsDeclarer
+import org.apache.heron.api.topology.TopologyContext
+import org.apache.heron.api.tuple.Fields
+import org.apache.heron.api.tuple.Values
+import org.apache.heron.api.windowing.TupleWindow
+
+class TestWindowBolt extends BaseWindowedBolt {
+  var outputCollector: OutputCollector = _
+  var tupleExecuted: Int = 0
+  
+  override def prepare(topoConf: JMap[String, Object], context: TopologyContext,
+                       collector: OutputCollector): Unit = {
+    this.outputCollector = collector
+  }
+
+  override def execute(inputWindow: TupleWindow): Unit = {
+    tupleExecuted = tupleExecuted + 1
+    val size: JDouble = inputWindow.get().size()
+    outputCollector.emit(new Values(size))
+  }
+
+  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
+    declarer.declare(new Fields("count"))
+  }
+}
+
diff --git a/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala b/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala
index 3c6d073..bdaf751 100644
--- a/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala
+++ b/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala
@@ -22,9 +22,22 @@ import scala.util.Random
 
 import org.junit.Assert.{assertEquals, assertTrue}
 
-import org.apache.heron.streamlet.WindowConfig
+import org.apache.heron.resource.{
+  TestBasicBolt,
+  TestBolt,
+  TestWindowBolt
+}
+import org.apache.heron.streamlet.{
+  IStreamletBasicOperator,
+  IStreamletOperator,
+  IStreamletWindowOperator,
+  WindowConfig
+}
 import org.apache.heron.streamlet.impl.streamlets.{
   ConsumerStreamlet,
+  CustomBasicStreamlet,
+  CustomStreamlet,
+  CustomWindowStreamlet,
   FilterStreamlet,
   FlatMapStreamlet,
   LogStreamlet,
@@ -394,6 +407,153 @@ class StreamletImplTest extends BaseFunSuite {
     assertEquals(0, transformStreamlet.getChildren.size())
   }
 
+  private class MyBoltOperator extends TestBolt with IStreamletOperator[Double, Double] {
+  }
+
+  test("StreamletImpl should support applyOperator operation on IStreamletOperator") {
+    
+    val testOperator = new MyBoltOperator()
+    val supplierStreamlet = builder
+      .newSource(() => Random.nextDouble())
+      .setName("Supplier_Streamlet_1")
+      .setNumPartitions(3)
+
+    supplierStreamlet
+      .map[Double] { num: Double =>
+        num * 10
+      }
+      .setName("Map_Streamlet_1")
+      .setNumPartitions(2)
+      .applyOperator(testOperator)
+      .setName("Custom_Streamlet_1")
+      .setNumPartitions(7)
+
+    val supplierStreamletImpl =
+      supplierStreamlet.asInstanceOf[StreamletImpl[Double]]
+    assertEquals(1, supplierStreamletImpl.getChildren.size)
+    assertTrue(
+      supplierStreamletImpl
+        .getChildren(0)
+        .isInstanceOf[MapStreamlet[_, _]])
+    val mapStreamlet = supplierStreamletImpl
+      .getChildren(0)
+      .asInstanceOf[MapStreamlet[Double, Double]]
+    assertEquals("Map_Streamlet_1", mapStreamlet.getName)
+    assertEquals(2, mapStreamlet.getNumPartitions)
+    assertEquals(1, mapStreamlet.getChildren.size())
+
+    assertTrue(
+      mapStreamlet
+        .getChildren()
+        .get(0)
+        .isInstanceOf[CustomStreamlet[_, _]])
+    val customStreamlet = mapStreamlet
+      .getChildren()
+      .get(0)
+      .asInstanceOf[CustomStreamlet[Double, Double]]
+    assertEquals("Custom_Streamlet_1", customStreamlet.getName)
+    assertEquals(7, customStreamlet.getNumPartitions)
+    assertEquals(0, customStreamlet.getChildren.size())
+  }
+
+  private class MyBasicBoltOperator extends TestBasicBolt
+      with IStreamletBasicOperator[Double, Double] {
+  }
+
+  test("StreamletImpl should support applyOperator operation on IStreamletBasicOperator") {
+    val testOperator = new MyBasicBoltOperator()
+    val supplierStreamlet = builder
+      .newSource(() => Random.nextDouble())
+      .setName("Supplier_Streamlet_1")
+      .setNumPartitions(3)
+
+    supplierStreamlet
+      .map[Double] { num: Double =>
+        num * 10
+      }
+      .setName("Map_Streamlet_1")
+      .setNumPartitions(2)
+      .applyOperator(testOperator)
+      .setName("CustomBasic_Streamlet_1")
+      .setNumPartitions(7)
+
+    val supplierStreamletImpl =
+      supplierStreamlet.asInstanceOf[StreamletImpl[Double]]
+    assertEquals(1, supplierStreamletImpl.getChildren.size)
+    assertTrue(
+      supplierStreamletImpl
+        .getChildren(0)
+        .isInstanceOf[MapStreamlet[_, _]])
+    val mapStreamlet = supplierStreamletImpl
+      .getChildren(0)
+      .asInstanceOf[MapStreamlet[Double, Double]]
+    assertEquals("Map_Streamlet_1", mapStreamlet.getName)
+    assertEquals(2, mapStreamlet.getNumPartitions)
+    assertEquals(1, mapStreamlet.getChildren.size())
+
+    assertTrue(
+      mapStreamlet
+        .getChildren()
+        .get(0)
+        .isInstanceOf[CustomBasicStreamlet[_, _]])
+    val customStreamlet = mapStreamlet
+      .getChildren()
+      .get(0)
+      .asInstanceOf[CustomBasicStreamlet[Double, Double]]
+    assertEquals("CustomBasic_Streamlet_1", customStreamlet.getName)
+    assertEquals(7, customStreamlet.getNumPartitions)
+    assertEquals(0, customStreamlet.getChildren.size())
+  }
+
+  private class MyWindowBoltOperator extends TestWindowBolt
+      with IStreamletWindowOperator[Double, Double] {
+  }
+
+  test("StreamletImpl should support applyOperator operation on IStreamletWindowOperator") {
+    val testOperator = new MyWindowBoltOperator()
+    val supplierStreamlet = builder
+      .newSource(() => Random.nextDouble())
+      .setName("Supplier_Streamlet_1")
+      .setNumPartitions(3)
+
+    supplierStreamlet
+      .map[Double] { num: Double =>
+        num * 10
+      }
+      .setName("Map_Streamlet_1")
+      .setNumPartitions(2)
+      .applyOperator(testOperator)
+      .setName("CustomWindow_Streamlet_1")
+      .setNumPartitions(7)
+
+    val supplierStreamletImpl =
+      supplierStreamlet.asInstanceOf[StreamletImpl[Double]]
+    assertEquals(1, supplierStreamletImpl.getChildren.size)
+    assertTrue(
+      supplierStreamletImpl
+        .getChildren(0)
+        .isInstanceOf[MapStreamlet[_, _]])
+    val mapStreamlet = supplierStreamletImpl
+      .getChildren(0)
+      .asInstanceOf[MapStreamlet[Double, Double]]
+    assertEquals("Map_Streamlet_1", mapStreamlet.getName)
+    assertEquals(2, mapStreamlet.getNumPartitions)
+    assertEquals(1, mapStreamlet.getChildren.size())
+
+    assertTrue(
+      mapStreamlet
+        .getChildren()
+        .get(0)
+        .isInstanceOf[CustomWindowStreamlet[_, _]])
+    val customStreamlet = mapStreamlet
+      .getChildren()
+      .get(0)
+      .asInstanceOf[CustomWindowStreamlet[Double, Double]]
+    assertEquals("CustomWindow_Streamlet_1", customStreamlet.getName)
+    assertEquals(7, customStreamlet.getNumPartitions)
+    assertEquals(0, customStreamlet.getChildren.size())
+  }
+
   test("StreamletImpl should support reduce operation") {
     val supplierStreamlet = builder
       .newSource(() => Random.nextInt(10))