You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by GitBox <gi...@apache.org> on 2018/10/24 18:28:01 UTC

[GitHub] nlu90 closed pull request #3051: Add interfaces IStreamletOperator, IStreamletBasicOperator and IStrea…

nlu90 closed pull request #3051: Add interfaces IStreamletOperator, IStreamletBasicOperator and IStrea…
URL: https://github.com/apache/incubator-heron/pull/3051
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/heron/api/src/java/org/apache/heron/streamlet/IStreamletBasicOperator.java b/heron/api/src/java/org/apache/heron/streamlet/IStreamletBasicOperator.java
new file mode 100644
index 0000000000..e6b615aa64
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/IStreamletBasicOperator.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.heron.streamlet;
+
+import org.apache.heron.api.bolt.IBasicBolt;
+
+/**
+ * The interface for streamlet operators. It can be used to create
+ * operators based on existing Bolts (subclasses of IBasicBolt).
+ */
+public interface IStreamletBasicOperator<R, T> extends IBasicBolt {
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java b/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java
new file mode 100644
index 0000000000..24b989c26b
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.heron.streamlet;
+
+import org.apache.heron.api.bolt.IRichBolt;
+
+/**
+ * The interface for custom operators: it can be used to create
+ * operators based on existing Bolts (subclasses of IRichBolt).
+ */
+public interface IStreamletOperator<R, T> extends IRichBolt {
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/IStreamletWindowOperator.java b/heron/api/src/java/org/apache/heron/streamlet/IStreamletWindowOperator.java
new file mode 100644
index 0000000000..7369fcaa1a
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/IStreamletWindowOperator.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.heron.streamlet;
+
+import org.apache.heron.api.bolt.IWindowedBolt;
+
+/**
+ * The interface for streamlet operators. It can be used to create
+ * operators based on existing Bolts (subclasses of IWindowedBolt).
+ */
+public interface IStreamletWindowOperator<R, T> extends IWindowedBolt {
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
index 82dbef53b2..2490537ab2 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
@@ -191,7 +191,7 @@
   Streamlet<R> union(Streamlet<? extends R> other);
 
   /**
-   * Returns a  new Streamlet by applying the transformFunction on each element of this streamlet.
+   * Returns a new Streamlet by applying the transformFunction on each element of this streamlet.
    * Before starting to cycle the transformFunction over the Streamlet, the open function is called.
    * This allows the transform Function to do any kind of initialization/loading, etc.
    * @param serializableTransformer The transformation function to be applied
@@ -201,6 +201,30 @@
   <T> Streamlet<T> transform(
       SerializableTransformer<R, ? extends T> serializableTransformer);
 
+  /**
+   * Returns a new Streamlet by applying the operator on each element of this streamlet.
+   * @param operator The operator to be applied
+   * @param <T> The return type of the transform
+   * @return Streamlet containing the output of the operation
+   */
+  <T> Streamlet<T> applyOperator(IStreamletOperator<R, T> operator);
+
+  /**
+   * Returns a new Streamlet by applying the operator on each element of this streamlet.
+   * @param operator The operator to be applied
+   * @param <T> The return type of the transform
+   * @return Streamlet containing the output of the operation
+   */
+  <T> Streamlet<T> applyOperator(IStreamletBasicOperator<R, T> operator);
+
+  /**
+   * Returns a new Streamlet by applying the operator on each element of this streamlet.
+   * @param operator The operator to be applied
+   * @param <T> The return type of the transform
+   * @return Streamlet containing the output of the operation
+   */
+  <T> Streamlet<T> applyOperator(IStreamletWindowOperator<R, T> operator);
+
   /**
    * Logs every element of the streamlet using String.valueOf function
    * This is one of the sink functions in the sense that this operation returns void
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
index fb5b950e54..677b34ef96 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
@@ -27,6 +27,9 @@
 import java.util.logging.Logger;
 
 import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.IStreamletBasicOperator;
+import org.apache.heron.streamlet.IStreamletOperator;
+import org.apache.heron.streamlet.IStreamletWindowOperator;
 import org.apache.heron.streamlet.JoinType;
 import org.apache.heron.streamlet.KeyValue;
 import org.apache.heron.streamlet.KeyedWindow;
@@ -42,6 +45,9 @@
 import org.apache.heron.streamlet.Streamlet;
 import org.apache.heron.streamlet.WindowConfig;
 import org.apache.heron.streamlet.impl.streamlets.ConsumerStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.CustomBasicStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.CustomStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.CustomWindowStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.FilterStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.FlatMapStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.GeneralReduceByKeyAndWindowStreamlet;
@@ -100,6 +106,9 @@ public boolean allBuilt() {
 
   protected enum StreamletNamePrefix {
     CONSUMER("consumer"),
+    CUSTOM("custom"),
+    CUSTOM_BASIC("customBasic"),
+    CUSTOM_WINDOW("customWindow"),
     FILTER("filter"),
     FLATMAP("flatmap"),
     REDUCE("reduceByKeyAndWindow"),
@@ -483,6 +492,46 @@ public void toSink(Sink<R> sink) {
     return transformStreamlet;
   }
 
+  /**
+   * Returns a new Streamlet by applying the operator on each element of this streamlet.
+   * @param operator The operator to be applied
+   * @param <T> The return type of the transform
+   * @return Streamlet containing the output of the operation
+   */
+  @Override
+  public <T> Streamlet<T> applyOperator(IStreamletOperator<R, T> operator) {
+    CustomStreamlet<R, T> customStreamlet = new CustomStreamlet<>(this, operator);
+    addChild(customStreamlet);
+    return customStreamlet;
+  }
+
+  /**
+   * Returns a new Streamlet by applying the operator on each element of this streamlet.
+   * @param operator The operator to be applied
+   * @param <T> The return type of the transform
+   * @return Streamlet containing the output of the operation
+   */
+  @Override
+  public <T> Streamlet<T> applyOperator(IStreamletBasicOperator<R, T> operator) {
+    CustomBasicStreamlet<R, T> customStreamlet = new CustomBasicStreamlet<>(this, operator);
+    addChild(customStreamlet);
+    return customStreamlet;
+  }
+
+  /**
+   * Returns a new Streamlet by applying the operator on each element of this streamlet.
+   * @param operator The operator to be applied
+   * @param <T> The return type of the transform
+   * @return Streamlet containing the output of the operation
+   */
+  @Override
+  public <T> Streamlet<T> applyOperator(IStreamletWindowOperator<R, T> operator) {
+    CustomWindowStreamlet<R, T> customStreamlet =
+        new CustomWindowStreamlet<>(this, operator);
+    addChild(customStreamlet);
+    return customStreamlet;
+  }
+
   /**
    * Verifies the requirement as the utility function.
    * @param requirement The requirement to verify
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FilterOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FilterOperator.java
index 9b42ee6f34..af9afe807e 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FilterOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FilterOperator.java
@@ -34,7 +34,7 @@
  * For every tuple that it encounters, the filter function is run
  * and the tuple is re-emitted if the predicate evaluates to true
  */
-public class FilterOperator<R> extends StreamletOperator {
+public class FilterOperator<R> extends StreamletOperator<R, R> {
   private static final long serialVersionUID = -4748646871471052706L;
   private SerializablePredicate<? super R> filterFn;
 
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FlatMapOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FlatMapOperator.java
index 4ba3a7f7f9..35be0cf5d8 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FlatMapOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FlatMapOperator.java
@@ -34,7 +34,7 @@
  * For every tuple, it applies the flatMapFunction, flattens the resulting
  * tuples and emits them
  */
-public class FlatMapOperator<R, T> extends StreamletOperator {
+public class FlatMapOperator<R, T> extends StreamletOperator<R, T> {
   private static final long serialVersionUID = -2418329215159618998L;
   private SerializableFunction<? super R, ? extends Iterable<? extends T>> flatMapFn;
 
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java
index f37ebeb646..7b9f71f503 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java
@@ -40,7 +40,7 @@
  * For every time window, the bolt goes over all the tuples in that window and applies the reduce
  * function grouped by keys. It emits a KeyedWindow, reduced Value KeyPairs as outputs
  */
-public class GeneralReduceByKeyAndWindowOperator<K, V, VR> extends StreamletWindowOperator {
+public class GeneralReduceByKeyAndWindowOperator<K, V, VR> extends StreamletWindowOperator<V, V> {
   private static final long serialVersionUID = 2833576046687752396L;
   private SerializableFunction<V, K> keyExtractor;
   private VR identity;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/JoinOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/JoinOperator.java
index 1aa31382a2..00c8d80063 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/JoinOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/JoinOperator.java
@@ -44,7 +44,7 @@
  * a config parameter. Also taken as parameters are which source is left and right.
  * This is needed for the semantics of outer/left/inner joins.
  */
-public class JoinOperator<K, V1, V2, VR> extends StreamletWindowOperator {
+public class JoinOperator<K, V1, V2, VR> extends StreamletWindowOperator<V1, VR> {
   private static final long serialVersionUID = 4875450390444745407L;
   private static final String LEFT_COMPONENT_NAME = "_streamlet_joinbolt_left_component_name_";
   private static final String RIGHT_COMPONENT_NAME = "_streamlet_joinbolt_right_component_name_";
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/MapOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/MapOperator.java
index b05e6f19e2..7ee9be1760 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/MapOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/MapOperator.java
@@ -33,7 +33,7 @@
  * It takes in the mapFunction Function as the input.
  * For every tuple, it applies the mapFunction, and emits the resulting value
  */
-public class MapOperator<R, T> extends StreamletOperator {
+public class MapOperator<R, T> extends StreamletOperator<R, T> {
   private static final long serialVersionUID = -1303096133107278700L;
   private SerializableFunction<? super R, ? extends T> mapFn;
 
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperator.java
index cf6dc38a4a..b0155df918 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperator.java
@@ -40,7 +40,7 @@
  * For every time window, the bolt goes over all the tuples in that window and applies the reduce
  * function grouped by keys. It emits a KeyedWindow, reduced Value KeyPairs as outputs
  */
-public class ReduceByKeyAndWindowOperator<K, V, R> extends StreamletWindowOperator {
+public class ReduceByKeyAndWindowOperator<K, V, R> extends StreamletWindowOperator<R, V> {
   private static final long serialVersionUID = 2833576046687750496L;
   private SerializableFunction<R, K> keyExtractor;
   private SerializableFunction<R, V> valueExtractor;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
index 27e6cd00cd..e2b3df6370 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
@@ -23,12 +23,15 @@
 import org.apache.heron.api.bolt.BaseRichBolt;
 import org.apache.heron.api.topology.OutputFieldsDeclarer;
 import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.streamlet.IStreamletOperator;
 
 /**
  * The Bolt interface that other operators of the streamlet packages extend.
  * The only common stuff amongst all of them is the output streams
  */
-public abstract class StreamletOperator extends BaseRichBolt {
+public abstract class StreamletOperator<R, T>
+    extends BaseRichBolt
+    implements IStreamletOperator<R, T> {
   private static final long serialVersionUID = 8524238140745238942L;
   private static final String OUTPUT_FIELD_NAME = "output";
 
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletWindowOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletWindowOperator.java
index e9ff1ba5a8..f90655e553 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletWindowOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletWindowOperator.java
@@ -23,12 +23,15 @@
 import org.apache.heron.api.bolt.BaseWindowedBolt;
 import org.apache.heron.api.topology.OutputFieldsDeclarer;
 import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.streamlet.IStreamletWindowOperator;
 
 /**
  * The Bolt interface that other windowed operators of the streamlet packages extend.
  * The only common stuff amongst all of them is the output streams
  */
-public abstract class StreamletWindowOperator extends BaseWindowedBolt {
+public abstract class StreamletWindowOperator<R, T>
+    extends BaseWindowedBolt
+    implements IStreamletWindowOperator<R, T> {
   private static final long serialVersionUID = -4836560876041237959L;
   private static final String OUTPUT_FIELD_NAME = "output";
 
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java
index e588f8bc99..0bf52322f6 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java
@@ -39,7 +39,7 @@
  * It calls the transformFunction setup/cleanup at the beginning/end of the
  * processing. And for every tuple, it applies the transformFunction, and emits the resulting value
  */
-public class TransformOperator<R, T> extends StreamletOperator
+public class TransformOperator<R, T> extends StreamletOperator<R, T>
     implements IStatefulComponent<Serializable, Serializable> {
   private static final long serialVersionUID = 429297144878185182L;
   private SerializableTransformer<? super R, ? extends T> serializableTransformer;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java
index f061eaeb0a..151c0b45f1 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java
@@ -31,7 +31,7 @@
  * UnionOperator is the class that implements the union functionality.
  * Its a very simple bolt that re-emits every tuple that it sees.
  */
-public class UnionOperator<I> extends StreamletOperator {
+public class UnionOperator<I> extends StreamletOperator<I, I> {
   private static final long serialVersionUID = -7326832064961413315L;
   private OutputCollector collector;
 
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ComplexSink.java b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ComplexSink.java
index ab4dbc4cd4..943e3c588e 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ComplexSink.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ComplexSink.java
@@ -37,7 +37,7 @@
  * ConsumerSink is a very simple Sink that basically invokes a user supplied
  * consume function for every tuple.
  */
-public class ComplexSink<R> extends StreamletOperator
+public class ComplexSink<R> extends StreamletOperator<R, R>
     implements IStatefulComponent<Serializable, Serializable> {
   private static final long serialVersionUID = 8717991188885786658L;
   private Sink<R> sink;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ConsumerSink.java b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ConsumerSink.java
index 687e94410f..af947048b5 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ConsumerSink.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ConsumerSink.java
@@ -32,7 +32,7 @@
  * ConsumerSink is a very simple Sink that basically invokes a user supplied
  * consume function for every tuple.
  */
-public class ConsumerSink<R> extends StreamletOperator {
+public class ConsumerSink<R> extends StreamletOperator<R, R> {
   private static final long serialVersionUID = 8716140142187667638L;
   private SerializableConsumer<R> consumer;
   private OutputCollector collector;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/LogSink.java b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/LogSink.java
index 37c36f4fe8..7fdc3c95ee 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/LogSink.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/LogSink.java
@@ -32,7 +32,7 @@
  * LogSink is a very simple Bolt that implements the log functionality.
  * It basically logs every tuple.
  */
-public class LogSink<R> extends StreamletOperator {
+public class LogSink<R> extends StreamletOperator<R, R> {
   private static final long serialVersionUID = -6392422646613189818L;
   private static final Logger LOG = Logger.getLogger(LogSink.class.getName());
   private OutputCollector collector;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomBasicStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomBasicStreamlet.java
new file mode 100644
index 0000000000..7d4ae37da2
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomBasicStreamlet.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.heron.streamlet.impl.streamlets;
+
+import java.util.Set;
+
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.IStreamletBasicOperator;
+import org.apache.heron.streamlet.impl.StreamletImpl;
+
+/**
+ * CustomBasicStreamlet represents a Streamlet that is made up of applying the user
+ * supplied custom operator to each element of the parent streamlet.
+ */
+public class CustomBasicStreamlet<R, T> extends StreamletImpl<T> {
+  private StreamletImpl<R> parent;
+  private IStreamletBasicOperator<R, T> op;
+
+  /**
+   * Create a custom streamlet from user defined CustomBasicOperator object.
+   * @param parent The parent(upstream) streamlet object
+   * @param op The user defined CustomeBasicOperator
+   */
+  public CustomBasicStreamlet(StreamletImpl<R> parent,
+                              IStreamletBasicOperator<R, T> op) {
+    this.parent = parent;
+    this.op = op;
+    setNumPartitions(parent.getNumPartitions());
+  }
+
+  @Override
+  public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
+    setDefaultNameIfNone(StreamletNamePrefix.CUSTOM_BASIC, stageNames);
+    bldr.setBolt(getName(), op,  getNumPartitions()).shuffleGrouping(parent.getName());
+    return true;
+  }
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
new file mode 100644
index 0000000000..25258d2009
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.heron.streamlet.impl.streamlets;
+
+import java.util.Set;
+
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.IStreamletOperator;
+import org.apache.heron.streamlet.impl.StreamletImpl;
+
+/**
+ * CustomStreamlet represents a Streamlet that is made up of applying the user
+ * supplied custom operator to each element of the parent streamlet.
+ */
+public class CustomStreamlet<R, T> extends StreamletImpl<T> {
+  private StreamletImpl<R> parent;
+  private IStreamletOperator<R, T> op;
+
+  /**
+   * Create a custom streamlet from user defined CustomOperator object.
+   * @param parent The parent(upstream) streamlet object
+   * @param op The user defined CustomeOperator
+   */
+  public CustomStreamlet(StreamletImpl<R> parent,
+                         IStreamletOperator<R, T> op) {
+    this.parent = parent;
+    this.op = op;
+    setNumPartitions(parent.getNumPartitions());
+  }
+
+  @Override
+  public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
+    setDefaultNameIfNone(StreamletNamePrefix.CUSTOM, stageNames);
+    bldr.setBolt(getName(), op,  getNumPartitions()).shuffleGrouping(parent.getName());
+    return true;
+  }
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomWindowStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomWindowStreamlet.java
new file mode 100644
index 0000000000..8f05aa1fa3
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomWindowStreamlet.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.heron.streamlet.impl.streamlets;
+
+import java.util.Set;
+
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.IStreamletWindowOperator;
+import org.apache.heron.streamlet.impl.StreamletImpl;
+
+/**
+ * CustomWindowOperator represents a Streamlet that is made up of applying the user
+ * supplied custom operator to each element of the parent streamlet.
+ */
+public class CustomWindowStreamlet<R, T> extends StreamletImpl<T> {
+  private StreamletImpl<R> parent;
+  private IStreamletWindowOperator<R, T> op;
+
+  /**
+   * Create a custom streamlet from user defined CustomWindowOperator object.
+   * @param parent The parent(upstream) streamlet object
+   * @param op The user defined CustomeWindowOperator
+   */
+  public CustomWindowStreamlet(StreamletImpl<R> parent,
+                              IStreamletWindowOperator<R, T> op) {
+    this.parent = parent;
+    this.op = op;
+    setNumPartitions(parent.getNumPartitions());
+  }
+
+  @Override
+  public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
+    setDefaultNameIfNone(StreamletNamePrefix.CUSTOM_WINDOW, stageNames);
+    bldr.setBolt(getName(), op,  getNumPartitions()).shuffleGrouping(parent.getName());
+    return true;
+  }
+}
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
index 88dc5c5986..b2dc4ca31d 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
@@ -19,6 +19,9 @@
 package org.apache.heron.streamlet.scala
 
 import org.apache.heron.streamlet.{
+  IStreamletBasicOperator,
+  IStreamletOperator,
+  IStreamletWindowOperator,
   JoinType,
   KeyValue,
   KeyedWindow,
@@ -222,6 +225,30 @@ trait Streamlet[R] {
   def transform[T](
       serializableTransformer: SerializableTransformer[R, _ <: T]): Streamlet[T]
 
+/**
+   * Returns a new Streamlet by applying the operator on each element of this streamlet.
+   * @param operator The operator to be applied
+   * @param <T> The return type of the transform
+   * @return Streamlet containing the output of the operation
+   */
+  def applyOperator[T](operator: IStreamletOperator[R, T]): Streamlet[T]
+
+  /**
+   * Returns a new Streamlet by applying the operator on each element of this streamlet.
+   * @param operator The operator to be applied
+   * @param <T> The return type of the transform
+   * @return Streamlet containing the output of the operation
+   */
+  def applyOperator[T](operator: IStreamletBasicOperator[R, T]): Streamlet[T]
+
+  /**
+   * Returns a new Streamlet by applying the operator on each element of this streamlet.
+   * @param operator The operator to be applied
+   * @param <T> The return type of the transform
+   * @return Streamlet containing the output of the operation
+   */
+  def applyOperator[T](operator: IStreamletWindowOperator[R, T]): Streamlet[T]
+
   /**
     * Logs every element of the streamlet using String.valueOf function
     * This is one of the sink functions in the sense that this operation returns void
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
index 6d780bdbd2..bb1b28ac3b 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
@@ -21,6 +21,9 @@ package org.apache.heron.streamlet.scala.impl
 import scala.collection.JavaConverters
 
 import org.apache.heron.streamlet.{
+  IStreamletBasicOperator,
+  IStreamletOperator,
+  IStreamletWindowOperator,
   JoinType,
   KeyValue,
   KeyedWindow,
@@ -318,6 +321,39 @@ class StreamletImpl[R](val javaStreamlet: JavaStreamlet[R])
     fromJavaStreamlet(newJavaStreamlet)
   }
 
+  /**
+   * Returns a new Streamlet by applying the operator on each element of this streamlet.
+   * @param operator The operator to be applied
+   * @param <T> The return type of the transform
+   * @return Streamlet containing the output of the operation
+   */
+  override def applyOperator[T](operator: IStreamletOperator[R, T]): Streamlet[T] = {
+    val newJavaStreamlet = javaStreamlet.applyOperator[T](operator)
+    fromJavaStreamlet(newJavaStreamlet)
+  }
+
+  /**
+   * Returns a new Streamlet by applying the operator on each element of this streamlet.
+   * @param operator The operator to be applied
+   * @param <T> The return type of the transform
+   * @return Streamlet containing the output of the operation
+   */
+  override def applyOperator[T](operator: IStreamletBasicOperator[R, T]): Streamlet[T] = {
+    val newJavaStreamlet = javaStreamlet.applyOperator[T](operator)
+    fromJavaStreamlet(newJavaStreamlet)
+  }
+
+  /**
+   * Returns a new Streamlet by applying the operator on each element of this streamlet.
+   * @param operator The operator to be applied
+   * @param <T> The return type of the transform
+   * @return Streamlet containing the output of the operation
+   */
+  override def applyOperator[T](operator: IStreamletWindowOperator[R, T]): Streamlet[T] = {
+    val newJavaStreamlet = javaStreamlet.applyOperator[T](operator)
+    fromJavaStreamlet(newJavaStreamlet)
+  }
+
   /**
     * Logs every element of the streamlet using String.valueOf function
     * This is one of the sink functions in the sense that this operation returns void
diff --git a/heron/api/tests/java/org/apache/heron/resource/TestBasicBolt.java b/heron/api/tests/java/org/apache/heron/resource/TestBasicBolt.java
new file mode 100644
index 0000000000..a316eb1f4e
--- /dev/null
+++ b/heron/api/tests/java/org/apache/heron/resource/TestBasicBolt.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.resource;
+
+import java.util.Map;
+
+import org.apache.heron.api.bolt.BaseBasicBolt;
+import org.apache.heron.api.bolt.BasicOutputCollector;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.api.tuple.Tuple;
+import org.apache.heron.api.tuple.Values;
+
+public class TestBasicBolt extends BaseBasicBolt {
+  private int tupleExecuted;
+
+  @Override
+  public void prepare(Map<String, Object> heronConf, TopologyContext context) {
+    this.tupleExecuted = 0;
+  }
+
+  @Override
+  public void execute(Tuple input, BasicOutputCollector collector) {
+    tupleExecuted++;
+    collector.emit(new Values(input.getString(0)));
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+    outputFieldsDeclarer.declare(new Fields("output"));
+  }
+
+  public int getExecuted() {
+    return tupleExecuted;
+  }
+}
diff --git a/heron/api/tests/java/org/apache/heron/resource/TestBolt.java b/heron/api/tests/java/org/apache/heron/resource/TestBolt.java
new file mode 100644
index 0000000000..b01ff29e88
--- /dev/null
+++ b/heron/api/tests/java/org/apache/heron/resource/TestBolt.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.resource;
+
+import java.util.Map;
+
+import org.apache.heron.api.bolt.BaseRichBolt;
+import org.apache.heron.api.bolt.OutputCollector;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.api.tuple.Tuple;
+import org.apache.heron.api.tuple.Values;
+
+public class TestBolt extends BaseRichBolt {
+  private OutputCollector outputCollector;
+  private int tupleExecuted;
+
+  @Override
+  public void prepare(
+      Map<String, Object> map,
+      TopologyContext topologyContext,
+      OutputCollector collector) {
+    this.outputCollector = collector;
+    this.tupleExecuted = 0;
+  }
+
+  @Override
+  public void execute(Tuple tuple) {
+    tupleExecuted++;
+    outputCollector.emit(new Values(tuple.getString(0)));
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+    outputFieldsDeclarer.declare(new Fields("output"));
+  }
+
+  public int getExecuted() {
+    return tupleExecuted;
+  }
+}
diff --git a/heron/api/tests/java/org/apache/heron/resource/TestWindowBolt.java b/heron/api/tests/java/org/apache/heron/resource/TestWindowBolt.java
new file mode 100644
index 0000000000..099f3946d1
--- /dev/null
+++ b/heron/api/tests/java/org/apache/heron/resource/TestWindowBolt.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.resource;
+
+import java.util.Map;
+
+import org.apache.heron.api.bolt.BaseWindowedBolt;
+import org.apache.heron.api.bolt.OutputCollector;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.api.tuple.Values;
+import org.apache.heron.api.windowing.TupleWindow;
+
+public class TestWindowBolt extends BaseWindowedBolt {
+  private OutputCollector outputCollector;
+  private int tupleExecuted;
+
+  @Override
+  public void prepare(Map<String, Object> topoConf, TopologyContext context,
+                      OutputCollector collector) {
+    this.outputCollector = collector;
+    this.tupleExecuted = 0;
+  }
+
+  @Override
+  public void execute(TupleWindow inputWindow) {
+    tupleExecuted++;
+    outputCollector.emit(new Values(inputWindow.get().size()));
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declare(new Fields("count"));
+  }
+
+  public int getExecuted() {
+    return tupleExecuted;
+  }
+}
+
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
index cec9b456a8..87af426de2 100644
--- a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
+++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
@@ -30,13 +30,22 @@
 
 import org.apache.heron.api.topology.TopologyBuilder;
 import org.apache.heron.common.basics.ByteAmount;
+import org.apache.heron.resource.TestBasicBolt;
+import org.apache.heron.resource.TestBolt;
+import org.apache.heron.resource.TestWindowBolt;
 import org.apache.heron.streamlet.Config;
 import org.apache.heron.streamlet.Context;
+import org.apache.heron.streamlet.IStreamletBasicOperator;
+import org.apache.heron.streamlet.IStreamletOperator;
+import org.apache.heron.streamlet.IStreamletWindowOperator;
 import org.apache.heron.streamlet.SerializableConsumer;
 import org.apache.heron.streamlet.SerializableTransformer;
 import org.apache.heron.streamlet.Streamlet;
 import org.apache.heron.streamlet.WindowConfig;
 import org.apache.heron.streamlet.impl.streamlets.ConsumerStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.CustomBasicStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.CustomStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.CustomWindowStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.FilterStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.FlatMapStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.JoinStreamlet;
@@ -188,6 +197,61 @@ public void cleanup() {
     assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
   }
 
+  private class MyBoltOperator extends TestBolt implements IStreamletOperator<Double, Double> {
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testCustomStreamletFromBolt() throws Exception {
+    Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+    Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20)
+                                               .applyOperator(new MyBoltOperator());
+    assertTrue(streamlet instanceof CustomStreamlet);
+    CustomStreamlet<Double, Double> mStreamlet = (CustomStreamlet<Double, Double>) streamlet;
+    assertEquals(20, mStreamlet.getNumPartitions());
+    SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
+    assertEquals(supplierStreamlet.getChildren().size(), 1);
+    assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
+  }
+
+  private class MyBasicBoltOperator extends TestBasicBolt
+      implements IStreamletBasicOperator<Double, Double> {
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testCustomStreamletFromBasicBolt() throws Exception {
+    Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+    Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20)
+                                               .applyOperator(new MyBasicBoltOperator());
+    assertTrue(streamlet instanceof CustomBasicStreamlet);
+    CustomBasicStreamlet<Double, Double> mStreamlet =
+        (CustomBasicStreamlet<Double, Double>) streamlet;
+    assertEquals(20, mStreamlet.getNumPartitions());
+    SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
+    assertEquals(supplierStreamlet.getChildren().size(), 1);
+    assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
+  }
+
+  private class MyWindowBoltOperator extends TestWindowBolt
+      implements IStreamletWindowOperator<Double, Double> {
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testCustomStreamletFromWindowBolt() throws Exception {
+    Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+    Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20)
+                                               .applyOperator(new MyWindowBoltOperator());
+    assertTrue(streamlet instanceof CustomWindowStreamlet);
+    CustomWindowStreamlet<Double, Double> mStreamlet =
+        (CustomWindowStreamlet<Double, Double>) streamlet;
+    assertEquals(20, mStreamlet.getNumPartitions());
+    SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
+    assertEquals(supplierStreamlet.getChildren().size(), 1);
+    assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
+  }
+
   @Test
   @SuppressWarnings("unchecked")
   public void testSimpleBuild() throws Exception {
diff --git a/heron/api/tests/scala/BUILD b/heron/api/tests/scala/BUILD
index 61ce32861e..cb533f4eb3 100644
--- a/heron/api/tests/scala/BUILD
+++ b/heron/api/tests/scala/BUILD
@@ -1,6 +1,9 @@
 scala_test(
     name = "api-scala-test",
-    srcs = glob(["org/apache/heron/streamlet/scala/**/*.scala"]),
+    srcs = glob([
+        "org/apache/heron/streamlet/scala/**/*.scala",
+        "org/apache/heron/resource/**/*.scala"
+    ]),
     deps = [
         "//third_party/java:junit4",
         "//heron/api/src/scala:api-scala",
diff --git a/heron/api/tests/scala/org/apache/heron/resource/TestBasicBolt.scala b/heron/api/tests/scala/org/apache/heron/resource/TestBasicBolt.scala
new file mode 100644
index 0000000000..53ceac8d95
--- /dev/null
+++ b/heron/api/tests/scala/org/apache/heron/resource/TestBasicBolt.scala
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.resource
+
+import java.util.{Map => JMap}
+
+import org.apache.heron.api.bolt.BaseBasicBolt
+import org.apache.heron.api.bolt.BasicOutputCollector
+import org.apache.heron.api.topology.OutputFieldsDeclarer
+import org.apache.heron.api.topology.TopologyContext
+import org.apache.heron.api.tuple.Fields
+import org.apache.heron.api.tuple.Tuple
+import org.apache.heron.api.tuple.Values
+
+class TestBasicBolt extends BaseBasicBolt {
+  var tupleExecuted: Int = 0
+  
+  override def prepare(heronConf: JMap[String, Object], context: TopologyContext): Unit = {
+  }
+
+  override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
+    tupleExecuted = tupleExecuted + 1
+    val v = input.getDouble(0)
+    collector.emit(new Values(v))
+  }
+
+  
+  override def declareOutputFields(outputFieldsDeclarer: OutputFieldsDeclarer): Unit = {
+    outputFieldsDeclarer.declare(new Fields("output"))
+  }
+}
diff --git a/heron/api/tests/scala/org/apache/heron/resource/TestBolt.scala b/heron/api/tests/scala/org/apache/heron/resource/TestBolt.scala
new file mode 100644
index 0000000000..05b0ab36f7
--- /dev/null
+++ b/heron/api/tests/scala/org/apache/heron/resource/TestBolt.scala
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.resource
+
+import java.util.{Map => JMap}
+
+import org.apache.heron.api.bolt.BaseRichBolt
+import org.apache.heron.api.bolt.OutputCollector
+import org.apache.heron.api.topology.OutputFieldsDeclarer
+import org.apache.heron.api.topology.TopologyContext
+import org.apache.heron.api.tuple.Fields
+import org.apache.heron.api.tuple.Tuple
+import org.apache.heron.api.tuple.Values
+
+class TestBolt extends BaseRichBolt {
+  var outputCollector: OutputCollector = _
+  var tupleExecuted: Int = 0
+
+  override def prepare(
+      conf: JMap[String, Object],
+      topologyContext: TopologyContext,
+      collector: OutputCollector): Unit = {
+    outputCollector = collector
+  }
+
+  
+  override def execute(tuple: Tuple): Unit = {
+    tupleExecuted = tupleExecuted + 1
+    val v = tuple.getDouble(0)
+    outputCollector.emit(new Values(v))
+  }
+
+  override def declareOutputFields(outputFieldsDeclarer: OutputFieldsDeclarer): Unit = {
+    outputFieldsDeclarer.declare(new Fields("output"))
+  }
+}
diff --git a/heron/api/tests/scala/org/apache/heron/resource/TestWindowBolt.scala b/heron/api/tests/scala/org/apache/heron/resource/TestWindowBolt.scala
new file mode 100644
index 0000000000..863b14d7a6
--- /dev/null
+++ b/heron/api/tests/scala/org/apache/heron/resource/TestWindowBolt.scala
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.resource
+
+import java.lang.{Double => JDouble}
+import java.util.{Map => JMap}
+
+import org.apache.heron.api.bolt.BaseWindowedBolt
+import org.apache.heron.api.bolt.OutputCollector
+import org.apache.heron.api.topology.OutputFieldsDeclarer
+import org.apache.heron.api.topology.TopologyContext
+import org.apache.heron.api.tuple.Fields
+import org.apache.heron.api.tuple.Values
+import org.apache.heron.api.windowing.TupleWindow
+
+class TestWindowBolt extends BaseWindowedBolt {
+  var outputCollector: OutputCollector = _
+  var tupleExecuted: Int = 0
+  
+  override def prepare(topoConf: JMap[String, Object], context: TopologyContext,
+                       collector: OutputCollector): Unit = {
+    this.outputCollector = collector
+  }
+
+  override def execute(inputWindow: TupleWindow): Unit = {
+    tupleExecuted = tupleExecuted + 1
+    val size: JDouble = inputWindow.get().size()
+    outputCollector.emit(new Values(size))
+  }
+
+  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
+    declarer.declare(new Fields("count"))
+  }
+}
+
diff --git a/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala b/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala
index 3c6d073ef0..bdaf7517ac 100644
--- a/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala
+++ b/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala
@@ -22,9 +22,22 @@ import scala.util.Random
 
 import org.junit.Assert.{assertEquals, assertTrue}
 
-import org.apache.heron.streamlet.WindowConfig
+import org.apache.heron.resource.{
+  TestBasicBolt,
+  TestBolt,
+  TestWindowBolt
+}
+import org.apache.heron.streamlet.{
+  IStreamletBasicOperator,
+  IStreamletOperator,
+  IStreamletWindowOperator,
+  WindowConfig
+}
 import org.apache.heron.streamlet.impl.streamlets.{
   ConsumerStreamlet,
+  CustomBasicStreamlet,
+  CustomStreamlet,
+  CustomWindowStreamlet,
   FilterStreamlet,
   FlatMapStreamlet,
   LogStreamlet,
@@ -394,6 +407,153 @@ class StreamletImplTest extends BaseFunSuite {
     assertEquals(0, transformStreamlet.getChildren.size())
   }
 
+  private class MyBoltOperator extends TestBolt with IStreamletOperator[Double, Double] {
+  }
+
+  test("StreamletImpl should support applyOperator operation on IStreamletOperator") {
+    
+    val testOperator = new MyBoltOperator()
+    val supplierStreamlet = builder
+      .newSource(() => Random.nextDouble())
+      .setName("Supplier_Streamlet_1")
+      .setNumPartitions(3)
+
+    supplierStreamlet
+      .map[Double] { num: Double =>
+        num * 10
+      }
+      .setName("Map_Streamlet_1")
+      .setNumPartitions(2)
+      .applyOperator(testOperator)
+      .setName("Custom_Streamlet_1")
+      .setNumPartitions(7)
+
+    val supplierStreamletImpl =
+      supplierStreamlet.asInstanceOf[StreamletImpl[Double]]
+    assertEquals(1, supplierStreamletImpl.getChildren.size)
+    assertTrue(
+      supplierStreamletImpl
+        .getChildren(0)
+        .isInstanceOf[MapStreamlet[_, _]])
+    val mapStreamlet = supplierStreamletImpl
+      .getChildren(0)
+      .asInstanceOf[MapStreamlet[Double, Double]]
+    assertEquals("Map_Streamlet_1", mapStreamlet.getName)
+    assertEquals(2, mapStreamlet.getNumPartitions)
+    assertEquals(1, mapStreamlet.getChildren.size())
+
+    assertTrue(
+      mapStreamlet
+        .getChildren()
+        .get(0)
+        .isInstanceOf[CustomStreamlet[_, _]])
+    val customStreamlet = mapStreamlet
+      .getChildren()
+      .get(0)
+      .asInstanceOf[CustomStreamlet[Double, Double]]
+    assertEquals("Custom_Streamlet_1", customStreamlet.getName)
+    assertEquals(7, customStreamlet.getNumPartitions)
+    assertEquals(0, customStreamlet.getChildren.size())
+  }
+
+  private class MyBasicBoltOperator extends TestBasicBolt
+      with IStreamletBasicOperator[Double, Double] {
+  }
+
+  test("StreamletImpl should support applyOperator operation on IStreamletBasicOperator") {
+    val testOperator = new MyBasicBoltOperator()
+    val supplierStreamlet = builder
+      .newSource(() => Random.nextDouble())
+      .setName("Supplier_Streamlet_1")
+      .setNumPartitions(3)
+
+    supplierStreamlet
+      .map[Double] { num: Double =>
+        num * 10
+      }
+      .setName("Map_Streamlet_1")
+      .setNumPartitions(2)
+      .applyOperator(testOperator)
+      .setName("CustomBasic_Streamlet_1")
+      .setNumPartitions(7)
+
+    val supplierStreamletImpl =
+      supplierStreamlet.asInstanceOf[StreamletImpl[Double]]
+    assertEquals(1, supplierStreamletImpl.getChildren.size)
+    assertTrue(
+      supplierStreamletImpl
+        .getChildren(0)
+        .isInstanceOf[MapStreamlet[_, _]])
+    val mapStreamlet = supplierStreamletImpl
+      .getChildren(0)
+      .asInstanceOf[MapStreamlet[Double, Double]]
+    assertEquals("Map_Streamlet_1", mapStreamlet.getName)
+    assertEquals(2, mapStreamlet.getNumPartitions)
+    assertEquals(1, mapStreamlet.getChildren.size())
+
+    assertTrue(
+      mapStreamlet
+        .getChildren()
+        .get(0)
+        .isInstanceOf[CustomBasicStreamlet[_, _]])
+    val customStreamlet = mapStreamlet
+      .getChildren()
+      .get(0)
+      .asInstanceOf[CustomBasicStreamlet[Double, Double]]
+    assertEquals("CustomBasic_Streamlet_1", customStreamlet.getName)
+    assertEquals(7, customStreamlet.getNumPartitions)
+    assertEquals(0, customStreamlet.getChildren.size())
+  }
+
+  private class MyWindowBoltOperator extends TestWindowBolt
+      with IStreamletWindowOperator[Double, Double] {
+  }
+
+  test("StreamletImpl should support applyOperator operation on IStreamletWindowOperator") {
+    val testOperator = new MyWindowBoltOperator()
+    val supplierStreamlet = builder
+      .newSource(() => Random.nextDouble())
+      .setName("Supplier_Streamlet_1")
+      .setNumPartitions(3)
+
+    supplierStreamlet
+      .map[Double] { num: Double =>
+        num * 10
+      }
+      .setName("Map_Streamlet_1")
+      .setNumPartitions(2)
+      .applyOperator(testOperator)
+      .setName("CustomWindow_Streamlet_1")
+      .setNumPartitions(7)
+
+    val supplierStreamletImpl =
+      supplierStreamlet.asInstanceOf[StreamletImpl[Double]]
+    assertEquals(1, supplierStreamletImpl.getChildren.size)
+    assertTrue(
+      supplierStreamletImpl
+        .getChildren(0)
+        .isInstanceOf[MapStreamlet[_, _]])
+    val mapStreamlet = supplierStreamletImpl
+      .getChildren(0)
+      .asInstanceOf[MapStreamlet[Double, Double]]
+    assertEquals("Map_Streamlet_1", mapStreamlet.getName)
+    assertEquals(2, mapStreamlet.getNumPartitions)
+    assertEquals(1, mapStreamlet.getChildren.size())
+
+    assertTrue(
+      mapStreamlet
+        .getChildren()
+        .get(0)
+        .isInstanceOf[CustomWindowStreamlet[_, _]])
+    val customStreamlet = mapStreamlet
+      .getChildren()
+      .get(0)
+      .asInstanceOf[CustomWindowStreamlet[Double, Double]]
+    assertEquals("CustomWindow_Streamlet_1", customStreamlet.getName)
+    assertEquals(7, customStreamlet.getNumPartitions)
+    assertEquals(0, customStreamlet.getChildren.size())
+  }
+
   test("StreamletImpl should support reduce operation") {
     val supplierStreamlet = builder
       .newSource(() => Random.nextInt(10))


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services