You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nl...@apache.org on 2018/11/03 03:01:12 UTC

[incubator-heron] branch master updated: Nwang/refactor custom operator interface v2 (#3100)

This is an automated email from the ASF dual-hosted git repository.

nlu90 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 34a64f6  Nwang/refactor custom operator interface v2 (#3100)
34a64f6 is described below

commit 34a64f6eec69e22a54187d03d0fb4ad8ca2801cf
Author: Ning Wang <nw...@twitter.com>
AuthorDate: Fri Nov 2 20:01:05 2018 -0700

    Nwang/refactor custom operator interface v2 (#3100)
    
    * Reduce number of applyOperator() functions in Streamlet
    
    * Remove CustomBasicStreamlet and CustomWindowStreamlet
---
 .../heron/streamlet/IStreamletBasicOperator.java   |  2 +-
 .../apache/heron/streamlet/IStreamletOperator.java |  7 +--
 ...etOperator.java => IStreamletRichOperator.java} |  2 +-
 .../heron/streamlet/IStreamletWindowOperator.java  |  2 +-
 .../java/org/apache/heron/streamlet/Streamlet.java | 16 -------
 .../apache/heron/streamlet/impl/StreamletImpl.java | 34 +------------
 .../impl/operators/StreamletOperator.java          |  4 +-
 .../impl/streamlets/CustomBasicStreamlet.java      | 55 ----------------------
 .../streamlet/impl/streamlets/CustomStreamlet.java | 29 +++++++++---
 .../impl/streamlets/CustomWindowStreamlet.java     | 55 ----------------------
 .../apache/heron/streamlet/scala/Streamlet.scala   | 18 -------
 .../heron/streamlet/scala/impl/StreamletImpl.scala | 24 ----------
 .../heron/streamlet/impl/StreamletImplTest.java    | 18 ++++---
 .../streamlet/scala/impl/StreamletImplTest.scala   | 16 +++----
 14 files changed, 47 insertions(+), 235 deletions(-)

diff --git a/heron/api/src/java/org/apache/heron/streamlet/IStreamletBasicOperator.java b/heron/api/src/java/org/apache/heron/streamlet/IStreamletBasicOperator.java
index e6b615a..98b87f3 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/IStreamletBasicOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/IStreamletBasicOperator.java
@@ -26,5 +26,5 @@ import org.apache.heron.api.bolt.IBasicBolt;
  * The interface for streamlet operators. It can be used to create
  * operators based on existing Bolts (subclasses of IBasicBolt).
  */
-public interface IStreamletBasicOperator<R, T> extends IBasicBolt {
+public interface IStreamletBasicOperator<R, T> extends IStreamletOperator<R, T>, IBasicBolt {
 }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java b/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java
index 24b989c..0eecb60 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java
@@ -20,11 +20,8 @@
 
 package org.apache.heron.streamlet;
 
-import org.apache.heron.api.bolt.IRichBolt;
-
 /**
- * The interface for custom operators: it can be used to create
- * operators based on existing Bolts (subclasses of IRichBolt).
+ * The base interface for all Streamlet operator interfaces.
  */
-public interface IStreamletOperator<R, T> extends IRichBolt {
+public interface IStreamletOperator<R, T> {
 }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java b/heron/api/src/java/org/apache/heron/streamlet/IStreamletRichOperator.java
similarity index 91%
copy from heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java
copy to heron/api/src/java/org/apache/heron/streamlet/IStreamletRichOperator.java
index 24b989c..c6800b8 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/IStreamletRichOperator.java
@@ -26,5 +26,5 @@ import org.apache.heron.api.bolt.IRichBolt;
  * The interface for custom operators: it can be used to create
  * operators based on existing Bolts (subclasses of IRichBolt).
  */
-public interface IStreamletOperator<R, T> extends IRichBolt {
+public interface IStreamletRichOperator<R, T> extends IStreamletOperator<R, T>, IRichBolt {
 }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/IStreamletWindowOperator.java b/heron/api/src/java/org/apache/heron/streamlet/IStreamletWindowOperator.java
index 7369fca..dcccae1 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/IStreamletWindowOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/IStreamletWindowOperator.java
@@ -26,5 +26,5 @@ import org.apache.heron.api.bolt.IWindowedBolt;
  * The interface for streamlet operators. It can be used to create
  * operators based on existing Bolts (subclasses of IWindowedBolt).
  */
-public interface IStreamletWindowOperator<R, T> extends IWindowedBolt {
+public interface IStreamletWindowOperator<R, T> extends IStreamletOperator<R, T>, IWindowedBolt {
 }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
index 2490537..b5fe931 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
@@ -210,22 +210,6 @@ public interface Streamlet<R> {
   <T> Streamlet<T> applyOperator(IStreamletOperator<R, T> operator);
 
   /**
-   * Returns a new Streamlet by applying the operator on each element of this streamlet.
-   * @param operator The operator to be applied
-   * @param <T> The return type of the transform
-   * @return Streamlet containing the output of the operation
-   */
-  <T> Streamlet<T> applyOperator(IStreamletBasicOperator<R, T> operator);
-
-  /**
-   * Returns a new Streamlet by applying the operator on each element of this streamlet.
-   * @param operator The operator to be applied
-   * @param <T> The return type of the transform
-   * @return Streamlet containing the output of the operation
-   */
-  <T> Streamlet<T> applyOperator(IStreamletWindowOperator<R, T> operator);
-
-  /**
    * Logs every element of the streamlet using String.valueOf function
    * This is one of the sink functions in the sense that this operation returns void
    */
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
index 677b34e..b1c427a 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
@@ -27,9 +27,7 @@ import java.util.Set;
 import java.util.logging.Logger;
 
 import org.apache.heron.api.topology.TopologyBuilder;
-import org.apache.heron.streamlet.IStreamletBasicOperator;
 import org.apache.heron.streamlet.IStreamletOperator;
-import org.apache.heron.streamlet.IStreamletWindowOperator;
 import org.apache.heron.streamlet.JoinType;
 import org.apache.heron.streamlet.KeyValue;
 import org.apache.heron.streamlet.KeyedWindow;
@@ -45,9 +43,7 @@ import org.apache.heron.streamlet.Source;
 import org.apache.heron.streamlet.Streamlet;
 import org.apache.heron.streamlet.WindowConfig;
 import org.apache.heron.streamlet.impl.streamlets.ConsumerStreamlet;
-import org.apache.heron.streamlet.impl.streamlets.CustomBasicStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.CustomStreamlet;
-import org.apache.heron.streamlet.impl.streamlets.CustomWindowStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.FilterStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.FlatMapStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.GeneralReduceByKeyAndWindowStreamlet;
@@ -218,6 +214,7 @@ public abstract class StreamletImpl<R> implements Streamlet<R> {
     if (built) {
       throw new RuntimeException("Logic Error While building " + getName());
     }
+
     if (doBuild(bldr, stageNames)) {
       built = true;
       for (StreamletImpl<?> streamlet : children) {
@@ -500,34 +497,7 @@ public abstract class StreamletImpl<R> implements Streamlet<R> {
    */
   @Override
   public <T> Streamlet<T> applyOperator(IStreamletOperator<R, T> operator) {
-    CustomStreamlet<R, T> customStreamlet = new CustomStreamlet<>(this, operator);
-    addChild(customStreamlet);
-    return customStreamlet;
-  }
-
-  /**
-   * Returns a new Streamlet by applying the operator on each element of this streamlet.
-   * @param operator The operator to be applied
-   * @param <T> The return type of the transform
-   * @return Streamlet containing the output of the operation
-   */
-  @Override
-  public <T> Streamlet<T> applyOperator(IStreamletBasicOperator<R, T> operator) {
-    CustomBasicStreamlet<R, T> customStreamlet = new CustomBasicStreamlet<>(this, operator);
-    addChild(customStreamlet);
-    return customStreamlet;
-  }
-
-  /**
-   * Returns a new Streamlet by applying the operator on each element of this streamlet.
-   * @param operator The operator to be applied
-   * @param <T> The return type of the transform
-   * @return Streamlet containing the output of the operation
-   */
-  @Override
-  public <T> Streamlet<T> applyOperator(IStreamletWindowOperator<R, T> operator) {
-    CustomWindowStreamlet<R, T> customStreamlet =
-        new CustomWindowStreamlet<>(this, operator);
+    StreamletImpl<T> customStreamlet = new CustomStreamlet<>(this, operator);
     addChild(customStreamlet);
     return customStreamlet;
   }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
index e2b3df6..75e9b7f 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
@@ -23,7 +23,7 @@ package org.apache.heron.streamlet.impl.operators;
 import org.apache.heron.api.bolt.BaseRichBolt;
 import org.apache.heron.api.topology.OutputFieldsDeclarer;
 import org.apache.heron.api.tuple.Fields;
-import org.apache.heron.streamlet.IStreamletOperator;
+import org.apache.heron.streamlet.IStreamletRichOperator;
 
 /**
  * The Bolt interface that other operators of the streamlet packages extend.
@@ -31,7 +31,7 @@ import org.apache.heron.streamlet.IStreamletOperator;
  */
 public abstract class StreamletOperator<R, T>
     extends BaseRichBolt
-    implements IStreamletOperator<R, T> {
+    implements IStreamletRichOperator<R, T> {
   private static final long serialVersionUID = 8524238140745238942L;
   private static final String OUTPUT_FIELD_NAME = "output";
 
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomBasicStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomBasicStreamlet.java
deleted file mode 100644
index 7d4ae37..0000000
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomBasicStreamlet.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-package org.apache.heron.streamlet.impl.streamlets;
-
-import java.util.Set;
-
-import org.apache.heron.api.topology.TopologyBuilder;
-import org.apache.heron.streamlet.IStreamletBasicOperator;
-import org.apache.heron.streamlet.impl.StreamletImpl;
-
-/**
- * CustomBasicStreamlet represents a Streamlet that is made up of applying the user
- * supplied custom operator to each element of the parent streamlet.
- */
-public class CustomBasicStreamlet<R, T> extends StreamletImpl<T> {
-  private StreamletImpl<R> parent;
-  private IStreamletBasicOperator<R, T> op;
-
-  /**
-   * Create a custom streamlet from user defined CustomBasicOperator object.
-   * @param parent The parent(upstream) streamlet object
-   * @param op The user defined CustomeBasicOperator
-   */
-  public CustomBasicStreamlet(StreamletImpl<R> parent,
-                              IStreamletBasicOperator<R, T> op) {
-    this.parent = parent;
-    this.op = op;
-    setNumPartitions(parent.getNumPartitions());
-  }
-
-  @Override
-  public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
-    setDefaultNameIfNone(StreamletNamePrefix.CUSTOM_BASIC, stageNames);
-    bldr.setBolt(getName(), op,  getNumPartitions()).shuffleGrouping(parent.getName());
-    return true;
-  }
-}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
index 25258d2..b0f71c0 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
@@ -23,7 +23,10 @@ package org.apache.heron.streamlet.impl.streamlets;
 import java.util.Set;
 
 import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.IStreamletBasicOperator;
 import org.apache.heron.streamlet.IStreamletOperator;
+import org.apache.heron.streamlet.IStreamletRichOperator;
+import org.apache.heron.streamlet.IStreamletWindowOperator;
 import org.apache.heron.streamlet.impl.StreamletImpl;
 
 /**
@@ -32,24 +35,38 @@ import org.apache.heron.streamlet.impl.StreamletImpl;
  */
 public class CustomStreamlet<R, T> extends StreamletImpl<T> {
   private StreamletImpl<R> parent;
-  private IStreamletOperator<R, T> op;
+  private IStreamletOperator<R, T> operator;
 
   /**
    * Create a custom streamlet from user defined CustomOperator object.
    * @param parent The parent(upstream) streamlet object
-   * @param op The user defined CustomeOperator
+   * @param operator The user defined CustomeOperator
    */
   public CustomStreamlet(StreamletImpl<R> parent,
-                         IStreamletOperator<R, T> op) {
+                         IStreamletOperator<R, T> operator) {
     this.parent = parent;
-    this.op = op;
+    this.operator = operator;
     setNumPartitions(parent.getNumPartitions());
   }
 
   @Override
   public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
-    setDefaultNameIfNone(StreamletNamePrefix.CUSTOM, stageNames);
-    bldr.setBolt(getName(), op,  getNumPartitions()).shuffleGrouping(parent.getName());
+    if (operator instanceof IStreamletBasicOperator) {
+      setDefaultNameIfNone(StreamletNamePrefix.CUSTOM, stageNames);
+      IStreamletBasicOperator<R, T> op = (IStreamletBasicOperator<R, T>) operator;
+      bldr.setBolt(getName(), op,  getNumPartitions()).shuffleGrouping(parent.getName());
+    } else if (operator instanceof IStreamletRichOperator) {
+      setDefaultNameIfNone(StreamletNamePrefix.CUSTOM_BASIC, stageNames);
+      IStreamletRichOperator<R, T> op = (IStreamletRichOperator<R, T>) operator;
+      bldr.setBolt(getName(), op,  getNumPartitions()).shuffleGrouping(parent.getName());
+    } else if (operator instanceof IStreamletWindowOperator) {
+      setDefaultNameIfNone(StreamletNamePrefix.CUSTOM_WINDOW, stageNames);
+      IStreamletWindowOperator<R, T> op = (IStreamletWindowOperator<R, T>) operator;
+      bldr.setBolt(getName(), op,  getNumPartitions()).shuffleGrouping(parent.getName());
+    } else {
+      throw new RuntimeException("Unhandled operator class is found!");
+    }
+
     return true;
   }
 }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomWindowStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomWindowStreamlet.java
deleted file mode 100644
index 8f05aa1..0000000
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomWindowStreamlet.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-package org.apache.heron.streamlet.impl.streamlets;
-
-import java.util.Set;
-
-import org.apache.heron.api.topology.TopologyBuilder;
-import org.apache.heron.streamlet.IStreamletWindowOperator;
-import org.apache.heron.streamlet.impl.StreamletImpl;
-
-/**
- * CustomWindowOperator represents a Streamlet that is made up of applying the user
- * supplied custom operator to each element of the parent streamlet.
- */
-public class CustomWindowStreamlet<R, T> extends StreamletImpl<T> {
-  private StreamletImpl<R> parent;
-  private IStreamletWindowOperator<R, T> op;
-
-  /**
-   * Create a custom streamlet from user defined CustomWindowOperator object.
-   * @param parent The parent(upstream) streamlet object
-   * @param op The user defined CustomeWindowOperator
-   */
-  public CustomWindowStreamlet(StreamletImpl<R> parent,
-                              IStreamletWindowOperator<R, T> op) {
-    this.parent = parent;
-    this.op = op;
-    setNumPartitions(parent.getNumPartitions());
-  }
-
-  @Override
-  public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
-    setDefaultNameIfNone(StreamletNamePrefix.CUSTOM_WINDOW, stageNames);
-    bldr.setBolt(getName(), op,  getNumPartitions()).shuffleGrouping(parent.getName());
-    return true;
-  }
-}
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
index b2dc4ca..5a11c75 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
@@ -19,9 +19,7 @@
 package org.apache.heron.streamlet.scala
 
 import org.apache.heron.streamlet.{
-  IStreamletBasicOperator,
   IStreamletOperator,
-  IStreamletWindowOperator,
   JoinType,
   KeyValue,
   KeyedWindow,
@@ -234,22 +232,6 @@ trait Streamlet[R] {
   def applyOperator[T](operator: IStreamletOperator[R, T]): Streamlet[T]
 
   /**
-   * Returns a new Streamlet by applying the operator on each element of this streamlet.
-   * @param operator The operator to be applied
-   * @param <T> The return type of the transform
-   * @return Streamlet containing the output of the operation
-   */
-  def applyOperator[T](operator: IStreamletBasicOperator[R, T]): Streamlet[T]
-
-  /**
-   * Returns a new Streamlet by applying the operator on each element of this streamlet.
-   * @param operator The operator to be applied
-   * @param <T> The return type of the transform
-   * @return Streamlet containing the output of the operation
-   */
-  def applyOperator[T](operator: IStreamletWindowOperator[R, T]): Streamlet[T]
-
-  /**
     * Logs every element of the streamlet using String.valueOf function
     * This is one of the sink functions in the sense that this operation returns void
     */
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
index bb1b28a..44b6ede 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
@@ -21,9 +21,7 @@ package org.apache.heron.streamlet.scala.impl
 import scala.collection.JavaConverters
 
 import org.apache.heron.streamlet.{
-  IStreamletBasicOperator,
   IStreamletOperator,
-  IStreamletWindowOperator,
   JoinType,
   KeyValue,
   KeyedWindow,
@@ -333,28 +331,6 @@ class StreamletImpl[R](val javaStreamlet: JavaStreamlet[R])
   }
 
   /**
-   * Returns a new Streamlet by applying the operator on each element of this streamlet.
-   * @param operator The operator to be applied
-   * @param <T> The return type of the transform
-   * @return Streamlet containing the output of the operation
-   */
-  override def applyOperator[T](operator: IStreamletBasicOperator[R, T]): Streamlet[T] = {
-    val newJavaStreamlet = javaStreamlet.applyOperator[T](operator)
-    fromJavaStreamlet(newJavaStreamlet)
-  }
-
-  /**
-   * Returns a new Streamlet by applying the operator on each element of this streamlet.
-   * @param operator The operator to be applied
-   * @param <T> The return type of the transform
-   * @return Streamlet containing the output of the operation
-   */
-  override def applyOperator[T](operator: IStreamletWindowOperator[R, T]): Streamlet[T] = {
-    val newJavaStreamlet = javaStreamlet.applyOperator[T](operator)
-    fromJavaStreamlet(newJavaStreamlet)
-  }
-
-  /**
     * Logs every element of the streamlet using String.valueOf function
     * This is one of the sink functions in the sense that this operation returns void
     */
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
index 87af426..5fd12e0 100644
--- a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
+++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
@@ -36,16 +36,14 @@ import org.apache.heron.resource.TestWindowBolt;
 import org.apache.heron.streamlet.Config;
 import org.apache.heron.streamlet.Context;
 import org.apache.heron.streamlet.IStreamletBasicOperator;
-import org.apache.heron.streamlet.IStreamletOperator;
+import org.apache.heron.streamlet.IStreamletRichOperator;
 import org.apache.heron.streamlet.IStreamletWindowOperator;
 import org.apache.heron.streamlet.SerializableConsumer;
 import org.apache.heron.streamlet.SerializableTransformer;
 import org.apache.heron.streamlet.Streamlet;
 import org.apache.heron.streamlet.WindowConfig;
 import org.apache.heron.streamlet.impl.streamlets.ConsumerStreamlet;
-import org.apache.heron.streamlet.impl.streamlets.CustomBasicStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.CustomStreamlet;
-import org.apache.heron.streamlet.impl.streamlets.CustomWindowStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.FilterStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.FlatMapStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.JoinStreamlet;
@@ -197,7 +195,7 @@ public class StreamletImplTest {
     assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
   }
 
-  private class MyBoltOperator extends TestBolt implements IStreamletOperator<Double, Double> {
+  private class MyBoltOperator extends TestBolt implements IStreamletRichOperator<Double, Double> {
   }
 
   @Test
@@ -224,9 +222,9 @@ public class StreamletImplTest {
     Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
     Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20)
                                                .applyOperator(new MyBasicBoltOperator());
-    assertTrue(streamlet instanceof CustomBasicStreamlet);
-    CustomBasicStreamlet<Double, Double> mStreamlet =
-        (CustomBasicStreamlet<Double, Double>) streamlet;
+    assertTrue(streamlet instanceof CustomStreamlet);
+    CustomStreamlet<Double, Double> mStreamlet =
+        (CustomStreamlet<Double, Double>) streamlet;
     assertEquals(20, mStreamlet.getNumPartitions());
     SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
     assertEquals(supplierStreamlet.getChildren().size(), 1);
@@ -243,9 +241,9 @@ public class StreamletImplTest {
     Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
     Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20)
                                                .applyOperator(new MyWindowBoltOperator());
-    assertTrue(streamlet instanceof CustomWindowStreamlet);
-    CustomWindowStreamlet<Double, Double> mStreamlet =
-        (CustomWindowStreamlet<Double, Double>) streamlet;
+    assertTrue(streamlet instanceof CustomStreamlet);
+    CustomStreamlet<Double, Double> mStreamlet =
+        (CustomStreamlet<Double, Double>) streamlet;
     assertEquals(20, mStreamlet.getNumPartitions());
     SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
     assertEquals(supplierStreamlet.getChildren().size(), 1);
diff --git a/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala b/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala
index bdaf751..b843732 100644
--- a/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala
+++ b/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala
@@ -29,15 +29,13 @@ import org.apache.heron.resource.{
 }
 import org.apache.heron.streamlet.{
   IStreamletBasicOperator,
-  IStreamletOperator,
+  IStreamletRichOperator,
   IStreamletWindowOperator,
   WindowConfig
 }
 import org.apache.heron.streamlet.impl.streamlets.{
   ConsumerStreamlet,
-  CustomBasicStreamlet,
   CustomStreamlet,
-  CustomWindowStreamlet,
   FilterStreamlet,
   FlatMapStreamlet,
   LogStreamlet,
@@ -407,10 +405,10 @@ class StreamletImplTest extends BaseFunSuite {
     assertEquals(0, transformStreamlet.getChildren.size())
   }
 
-  private class MyBoltOperator extends TestBolt with IStreamletOperator[Double, Double] {
+  private class MyBoltOperator extends TestBolt with IStreamletRichOperator[Double, Double] {
   }
 
-  test("StreamletImpl should support applyOperator operation on IStreamletOperator") {
+  test("StreamletImpl should support applyOperator operation on IStreamletRichOperator") {
     
     val testOperator = new MyBoltOperator()
     val supplierStreamlet = builder
@@ -495,11 +493,11 @@ class StreamletImplTest extends BaseFunSuite {
       mapStreamlet
         .getChildren()
         .get(0)
-        .isInstanceOf[CustomBasicStreamlet[_, _]])
+        .isInstanceOf[CustomStreamlet[_, _]])
     val customStreamlet = mapStreamlet
       .getChildren()
       .get(0)
-      .asInstanceOf[CustomBasicStreamlet[Double, Double]]
+      .asInstanceOf[CustomStreamlet[Double, Double]]
     assertEquals("CustomBasic_Streamlet_1", customStreamlet.getName)
     assertEquals(7, customStreamlet.getNumPartitions)
     assertEquals(0, customStreamlet.getChildren.size())
@@ -544,11 +542,11 @@ class StreamletImplTest extends BaseFunSuite {
       mapStreamlet
         .getChildren()
         .get(0)
-        .isInstanceOf[CustomWindowStreamlet[_, _]])
+        .isInstanceOf[CustomStreamlet[_, _]])
     val customStreamlet = mapStreamlet
       .getChildren()
       .get(0)
-      .asInstanceOf[CustomWindowStreamlet[Double, Double]]
+      .asInstanceOf[CustomStreamlet[Double, Double]]
     assertEquals("CustomWindow_Streamlet_1", customStreamlet.getName)
     assertEquals(7, customStreamlet.getNumPartitions)
     assertEquals(0, customStreamlet.getChildren.size())