You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by GitBox <gi...@apache.org> on 2018/11/03 03:01:07 UTC

[GitHub] nlu90 closed pull request #3100: Nwang/refactor custom operator interface v2

nlu90 closed pull request #3100: Nwang/refactor custom operator interface v2
URL: https://github.com/apache/incubator-heron/pull/3100
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/heron/api/src/java/org/apache/heron/streamlet/IStreamletBasicOperator.java b/heron/api/src/java/org/apache/heron/streamlet/IStreamletBasicOperator.java
index e6b615aa64..98b87f355c 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/IStreamletBasicOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/IStreamletBasicOperator.java
@@ -26,5 +26,5 @@
  * The interface for streamlet operators. It can be used to create
  * operators based on existing Bolts (subclasses of IBasicBolt).
  */
-public interface IStreamletBasicOperator<R, T> extends IBasicBolt {
+public interface IStreamletBasicOperator<R, T> extends IStreamletOperator<R, T>, IBasicBolt {
 }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java b/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java
index 24b989c26b..0eecb60b49 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java
@@ -20,11 +20,8 @@
 
 package org.apache.heron.streamlet;
 
-import org.apache.heron.api.bolt.IRichBolt;
-
 /**
- * The interface for custom operators: it can be used to create
- * operators based on existing Bolts (subclasses of IRichBolt).
+ * The base interface for all Streamlet operator interfaces.
  */
-public interface IStreamletOperator<R, T> extends IRichBolt {
+public interface IStreamletOperator<R, T> {
 }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/IStreamletRichOperator.java b/heron/api/src/java/org/apache/heron/streamlet/IStreamletRichOperator.java
new file mode 100644
index 0000000000..c6800b896f
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/IStreamletRichOperator.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.heron.streamlet;
+
+import org.apache.heron.api.bolt.IRichBolt;
+
+/**
+ * The interface for custom operators: it can be used to create
+ * operators based on existing Bolts (subclasses of IRichBolt).
+ */
+public interface IStreamletRichOperator<R, T> extends IStreamletOperator<R, T>, IRichBolt {
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/IStreamletWindowOperator.java b/heron/api/src/java/org/apache/heron/streamlet/IStreamletWindowOperator.java
index 7369fcaa1a..dcccae1184 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/IStreamletWindowOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/IStreamletWindowOperator.java
@@ -26,5 +26,5 @@
  * The interface for streamlet operators. It can be used to create
  * operators based on existing Bolts (subclasses of IWindowedBolt).
  */
-public interface IStreamletWindowOperator<R, T> extends IWindowedBolt {
+public interface IStreamletWindowOperator<R, T> extends IStreamletOperator<R, T>, IWindowedBolt {
 }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
index 2490537ab2..b5fe93162f 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
@@ -209,22 +209,6 @@
    */
   <T> Streamlet<T> applyOperator(IStreamletOperator<R, T> operator);
 
-  /**
-   * Returns a new Streamlet by applying the operator on each element of this streamlet.
-   * @param operator The operator to be applied
-   * @param <T> The return type of the transform
-   * @return Streamlet containing the output of the operation
-   */
-  <T> Streamlet<T> applyOperator(IStreamletBasicOperator<R, T> operator);
-
-  /**
-   * Returns a new Streamlet by applying the operator on each element of this streamlet.
-   * @param operator The operator to be applied
-   * @param <T> The return type of the transform
-   * @return Streamlet containing the output of the operation
-   */
-  <T> Streamlet<T> applyOperator(IStreamletWindowOperator<R, T> operator);
-
   /**
    * Logs every element of the streamlet using String.valueOf function
    * This is one of the sink functions in the sense that this operation returns void
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
index 677b34ef96..b1c427a909 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
@@ -27,9 +27,7 @@
 import java.util.logging.Logger;
 
 import org.apache.heron.api.topology.TopologyBuilder;
-import org.apache.heron.streamlet.IStreamletBasicOperator;
 import org.apache.heron.streamlet.IStreamletOperator;
-import org.apache.heron.streamlet.IStreamletWindowOperator;
 import org.apache.heron.streamlet.JoinType;
 import org.apache.heron.streamlet.KeyValue;
 import org.apache.heron.streamlet.KeyedWindow;
@@ -45,9 +43,7 @@
 import org.apache.heron.streamlet.Streamlet;
 import org.apache.heron.streamlet.WindowConfig;
 import org.apache.heron.streamlet.impl.streamlets.ConsumerStreamlet;
-import org.apache.heron.streamlet.impl.streamlets.CustomBasicStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.CustomStreamlet;
-import org.apache.heron.streamlet.impl.streamlets.CustomWindowStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.FilterStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.FlatMapStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.GeneralReduceByKeyAndWindowStreamlet;
@@ -218,6 +214,7 @@ public void build(TopologyBuilder bldr, Set<String> stageNames) {
     if (built) {
       throw new RuntimeException("Logic Error While building " + getName());
     }
+
     if (doBuild(bldr, stageNames)) {
       built = true;
       for (StreamletImpl<?> streamlet : children) {
@@ -500,34 +497,7 @@ public void toSink(Sink<R> sink) {
    */
   @Override
   public <T> Streamlet<T> applyOperator(IStreamletOperator<R, T> operator) {
-    CustomStreamlet<R, T> customStreamlet = new CustomStreamlet<>(this, operator);
-    addChild(customStreamlet);
-    return customStreamlet;
-  }
-
-  /**
-   * Returns a new Streamlet by applying the operator on each element of this streamlet.
-   * @param operator The operator to be applied
-   * @param <T> The return type of the transform
-   * @return Streamlet containing the output of the operation
-   */
-  @Override
-  public <T> Streamlet<T> applyOperator(IStreamletBasicOperator<R, T> operator) {
-    CustomBasicStreamlet<R, T> customStreamlet = new CustomBasicStreamlet<>(this, operator);
-    addChild(customStreamlet);
-    return customStreamlet;
-  }
-
-  /**
-   * Returns a new Streamlet by applying the operator on each element of this streamlet.
-   * @param operator The operator to be applied
-   * @param <T> The return type of the transform
-   * @return Streamlet containing the output of the operation
-   */
-  @Override
-  public <T> Streamlet<T> applyOperator(IStreamletWindowOperator<R, T> operator) {
-    CustomWindowStreamlet<R, T> customStreamlet =
-        new CustomWindowStreamlet<>(this, operator);
+    StreamletImpl<T> customStreamlet = new CustomStreamlet<>(this, operator);
     addChild(customStreamlet);
     return customStreamlet;
   }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
index e2b3df6370..75e9b7f4b0 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
@@ -23,7 +23,7 @@
 import org.apache.heron.api.bolt.BaseRichBolt;
 import org.apache.heron.api.topology.OutputFieldsDeclarer;
 import org.apache.heron.api.tuple.Fields;
-import org.apache.heron.streamlet.IStreamletOperator;
+import org.apache.heron.streamlet.IStreamletRichOperator;
 
 /**
  * The Bolt interface that other operators of the streamlet packages extend.
@@ -31,7 +31,7 @@
  */
 public abstract class StreamletOperator<R, T>
     extends BaseRichBolt
-    implements IStreamletOperator<R, T> {
+    implements IStreamletRichOperator<R, T> {
   private static final long serialVersionUID = 8524238140745238942L;
   private static final String OUTPUT_FIELD_NAME = "output";
 
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomBasicStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomBasicStreamlet.java
deleted file mode 100644
index 7d4ae37da2..0000000000
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomBasicStreamlet.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-package org.apache.heron.streamlet.impl.streamlets;
-
-import java.util.Set;
-
-import org.apache.heron.api.topology.TopologyBuilder;
-import org.apache.heron.streamlet.IStreamletBasicOperator;
-import org.apache.heron.streamlet.impl.StreamletImpl;
-
-/**
- * CustomBasicStreamlet represents a Streamlet that is made up of applying the user
- * supplied custom operator to each element of the parent streamlet.
- */
-public class CustomBasicStreamlet<R, T> extends StreamletImpl<T> {
-  private StreamletImpl<R> parent;
-  private IStreamletBasicOperator<R, T> op;
-
-  /**
-   * Create a custom streamlet from user defined CustomBasicOperator object.
-   * @param parent The parent(upstream) streamlet object
-   * @param op The user defined CustomeBasicOperator
-   */
-  public CustomBasicStreamlet(StreamletImpl<R> parent,
-                              IStreamletBasicOperator<R, T> op) {
-    this.parent = parent;
-    this.op = op;
-    setNumPartitions(parent.getNumPartitions());
-  }
-
-  @Override
-  public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
-    setDefaultNameIfNone(StreamletNamePrefix.CUSTOM_BASIC, stageNames);
-    bldr.setBolt(getName(), op,  getNumPartitions()).shuffleGrouping(parent.getName());
-    return true;
-  }
-}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
index 25258d2009..b0f71c0050 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
@@ -23,7 +23,10 @@
 import java.util.Set;
 
 import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.IStreamletBasicOperator;
 import org.apache.heron.streamlet.IStreamletOperator;
+import org.apache.heron.streamlet.IStreamletRichOperator;
+import org.apache.heron.streamlet.IStreamletWindowOperator;
 import org.apache.heron.streamlet.impl.StreamletImpl;
 
 /**
@@ -32,24 +35,38 @@
  */
 public class CustomStreamlet<R, T> extends StreamletImpl<T> {
   private StreamletImpl<R> parent;
-  private IStreamletOperator<R, T> op;
+  private IStreamletOperator<R, T> operator;
 
   /**
    * Create a custom streamlet from user defined CustomOperator object.
    * @param parent The parent(upstream) streamlet object
-   * @param op The user defined CustomeOperator
+   * @param operator The user defined CustomeOperator
    */
   public CustomStreamlet(StreamletImpl<R> parent,
-                         IStreamletOperator<R, T> op) {
+                         IStreamletOperator<R, T> operator) {
     this.parent = parent;
-    this.op = op;
+    this.operator = operator;
     setNumPartitions(parent.getNumPartitions());
   }
 
   @Override
   public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
-    setDefaultNameIfNone(StreamletNamePrefix.CUSTOM, stageNames);
-    bldr.setBolt(getName(), op,  getNumPartitions()).shuffleGrouping(parent.getName());
+    if (operator instanceof IStreamletBasicOperator) {
+      setDefaultNameIfNone(StreamletNamePrefix.CUSTOM, stageNames);
+      IStreamletBasicOperator<R, T> op = (IStreamletBasicOperator<R, T>) operator;
+      bldr.setBolt(getName(), op,  getNumPartitions()).shuffleGrouping(parent.getName());
+    } else if (operator instanceof IStreamletRichOperator) {
+      setDefaultNameIfNone(StreamletNamePrefix.CUSTOM_BASIC, stageNames);
+      IStreamletRichOperator<R, T> op = (IStreamletRichOperator<R, T>) operator;
+      bldr.setBolt(getName(), op,  getNumPartitions()).shuffleGrouping(parent.getName());
+    } else if (operator instanceof IStreamletWindowOperator) {
+      setDefaultNameIfNone(StreamletNamePrefix.CUSTOM_WINDOW, stageNames);
+      IStreamletWindowOperator<R, T> op = (IStreamletWindowOperator<R, T>) operator;
+      bldr.setBolt(getName(), op,  getNumPartitions()).shuffleGrouping(parent.getName());
+    } else {
+      throw new RuntimeException("Unhandled operator class is found!");
+    }
+
     return true;
   }
 }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomWindowStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomWindowStreamlet.java
deleted file mode 100644
index 8f05aa1fa3..0000000000
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomWindowStreamlet.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-package org.apache.heron.streamlet.impl.streamlets;
-
-import java.util.Set;
-
-import org.apache.heron.api.topology.TopologyBuilder;
-import org.apache.heron.streamlet.IStreamletWindowOperator;
-import org.apache.heron.streamlet.impl.StreamletImpl;
-
-/**
- * CustomWindowOperator represents a Streamlet that is made up of applying the user
- * supplied custom operator to each element of the parent streamlet.
- */
-public class CustomWindowStreamlet<R, T> extends StreamletImpl<T> {
-  private StreamletImpl<R> parent;
-  private IStreamletWindowOperator<R, T> op;
-
-  /**
-   * Create a custom streamlet from user defined CustomWindowOperator object.
-   * @param parent The parent(upstream) streamlet object
-   * @param op The user defined CustomeWindowOperator
-   */
-  public CustomWindowStreamlet(StreamletImpl<R> parent,
-                              IStreamletWindowOperator<R, T> op) {
-    this.parent = parent;
-    this.op = op;
-    setNumPartitions(parent.getNumPartitions());
-  }
-
-  @Override
-  public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
-    setDefaultNameIfNone(StreamletNamePrefix.CUSTOM_WINDOW, stageNames);
-    bldr.setBolt(getName(), op,  getNumPartitions()).shuffleGrouping(parent.getName());
-    return true;
-  }
-}
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
index b2dc4ca31d..5a11c75855 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
@@ -19,9 +19,7 @@
 package org.apache.heron.streamlet.scala
 
 import org.apache.heron.streamlet.{
-  IStreamletBasicOperator,
   IStreamletOperator,
-  IStreamletWindowOperator,
   JoinType,
   KeyValue,
   KeyedWindow,
@@ -233,22 +231,6 @@ trait Streamlet[R] {
    */
   def applyOperator[T](operator: IStreamletOperator[R, T]): Streamlet[T]
 
-  /**
-   * Returns a new Streamlet by applying the operator on each element of this streamlet.
-   * @param operator The operator to be applied
-   * @param <T> The return type of the transform
-   * @return Streamlet containing the output of the operation
-   */
-  def applyOperator[T](operator: IStreamletBasicOperator[R, T]): Streamlet[T]
-
-  /**
-   * Returns a new Streamlet by applying the operator on each element of this streamlet.
-   * @param operator The operator to be applied
-   * @param <T> The return type of the transform
-   * @return Streamlet containing the output of the operation
-   */
-  def applyOperator[T](operator: IStreamletWindowOperator[R, T]): Streamlet[T]
-
   /**
     * Logs every element of the streamlet using String.valueOf function
     * This is one of the sink functions in the sense that this operation returns void
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
index bb1b28ac3b..44b6ede305 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
@@ -21,9 +21,7 @@ package org.apache.heron.streamlet.scala.impl
 import scala.collection.JavaConverters
 
 import org.apache.heron.streamlet.{
-  IStreamletBasicOperator,
   IStreamletOperator,
-  IStreamletWindowOperator,
   JoinType,
   KeyValue,
   KeyedWindow,
@@ -332,28 +330,6 @@ class StreamletImpl[R](val javaStreamlet: JavaStreamlet[R])
     fromJavaStreamlet(newJavaStreamlet)
   }
 
-  /**
-   * Returns a new Streamlet by applying the operator on each element of this streamlet.
-   * @param operator The operator to be applied
-   * @param <T> The return type of the transform
-   * @return Streamlet containing the output of the operation
-   */
-  override def applyOperator[T](operator: IStreamletBasicOperator[R, T]): Streamlet[T] = {
-    val newJavaStreamlet = javaStreamlet.applyOperator[T](operator)
-    fromJavaStreamlet(newJavaStreamlet)
-  }
-
-  /**
-   * Returns a new Streamlet by applying the operator on each element of this streamlet.
-   * @param operator The operator to be applied
-   * @param <T> The return type of the transform
-   * @return Streamlet containing the output of the operation
-   */
-  override def applyOperator[T](operator: IStreamletWindowOperator[R, T]): Streamlet[T] = {
-    val newJavaStreamlet = javaStreamlet.applyOperator[T](operator)
-    fromJavaStreamlet(newJavaStreamlet)
-  }
-
   /**
     * Logs every element of the streamlet using String.valueOf function
     * This is one of the sink functions in the sense that this operation returns void
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
index 87af426de2..5fd12e0d15 100644
--- a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
+++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
@@ -36,16 +36,14 @@
 import org.apache.heron.streamlet.Config;
 import org.apache.heron.streamlet.Context;
 import org.apache.heron.streamlet.IStreamletBasicOperator;
-import org.apache.heron.streamlet.IStreamletOperator;
+import org.apache.heron.streamlet.IStreamletRichOperator;
 import org.apache.heron.streamlet.IStreamletWindowOperator;
 import org.apache.heron.streamlet.SerializableConsumer;
 import org.apache.heron.streamlet.SerializableTransformer;
 import org.apache.heron.streamlet.Streamlet;
 import org.apache.heron.streamlet.WindowConfig;
 import org.apache.heron.streamlet.impl.streamlets.ConsumerStreamlet;
-import org.apache.heron.streamlet.impl.streamlets.CustomBasicStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.CustomStreamlet;
-import org.apache.heron.streamlet.impl.streamlets.CustomWindowStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.FilterStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.FlatMapStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.JoinStreamlet;
@@ -197,7 +195,7 @@ public void cleanup() {
     assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
   }
 
-  private class MyBoltOperator extends TestBolt implements IStreamletOperator<Double, Double> {
+  private class MyBoltOperator extends TestBolt implements IStreamletRichOperator<Double, Double> {
   }
 
   @Test
@@ -224,9 +222,9 @@ public void testCustomStreamletFromBasicBolt() throws Exception {
     Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
     Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20)
                                                .applyOperator(new MyBasicBoltOperator());
-    assertTrue(streamlet instanceof CustomBasicStreamlet);
-    CustomBasicStreamlet<Double, Double> mStreamlet =
-        (CustomBasicStreamlet<Double, Double>) streamlet;
+    assertTrue(streamlet instanceof CustomStreamlet);
+    CustomStreamlet<Double, Double> mStreamlet =
+        (CustomStreamlet<Double, Double>) streamlet;
     assertEquals(20, mStreamlet.getNumPartitions());
     SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
     assertEquals(supplierStreamlet.getChildren().size(), 1);
@@ -243,9 +241,9 @@ public void testCustomStreamletFromWindowBolt() throws Exception {
     Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
     Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20)
                                                .applyOperator(new MyWindowBoltOperator());
-    assertTrue(streamlet instanceof CustomWindowStreamlet);
-    CustomWindowStreamlet<Double, Double> mStreamlet =
-        (CustomWindowStreamlet<Double, Double>) streamlet;
+    assertTrue(streamlet instanceof CustomStreamlet);
+    CustomStreamlet<Double, Double> mStreamlet =
+        (CustomStreamlet<Double, Double>) streamlet;
     assertEquals(20, mStreamlet.getNumPartitions());
     SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
     assertEquals(supplierStreamlet.getChildren().size(), 1);
diff --git a/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala b/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala
index bdaf7517ac..b8437327ff 100644
--- a/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala
+++ b/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala
@@ -29,15 +29,13 @@ import org.apache.heron.resource.{
 }
 import org.apache.heron.streamlet.{
   IStreamletBasicOperator,
-  IStreamletOperator,
+  IStreamletRichOperator,
   IStreamletWindowOperator,
   WindowConfig
 }
 import org.apache.heron.streamlet.impl.streamlets.{
   ConsumerStreamlet,
-  CustomBasicStreamlet,
   CustomStreamlet,
-  CustomWindowStreamlet,
   FilterStreamlet,
   FlatMapStreamlet,
   LogStreamlet,
@@ -407,10 +405,10 @@ class StreamletImplTest extends BaseFunSuite {
     assertEquals(0, transformStreamlet.getChildren.size())
   }
 
-  private class MyBoltOperator extends TestBolt with IStreamletOperator[Double, Double] {
+  private class MyBoltOperator extends TestBolt with IStreamletRichOperator[Double, Double] {
   }
 
-  test("StreamletImpl should support applyOperator operation on IStreamletOperator") {
+  test("StreamletImpl should support applyOperator operation on IStreamletRichOperator") {
     
     val testOperator = new MyBoltOperator()
     val supplierStreamlet = builder
@@ -495,11 +493,11 @@ class StreamletImplTest extends BaseFunSuite {
       mapStreamlet
         .getChildren()
         .get(0)
-        .isInstanceOf[CustomBasicStreamlet[_, _]])
+        .isInstanceOf[CustomStreamlet[_, _]])
     val customStreamlet = mapStreamlet
       .getChildren()
       .get(0)
-      .asInstanceOf[CustomBasicStreamlet[Double, Double]]
+      .asInstanceOf[CustomStreamlet[Double, Double]]
     assertEquals("CustomBasic_Streamlet_1", customStreamlet.getName)
     assertEquals(7, customStreamlet.getNumPartitions)
     assertEquals(0, customStreamlet.getChildren.size())
@@ -544,11 +542,11 @@ class StreamletImplTest extends BaseFunSuite {
       mapStreamlet
         .getChildren()
         .get(0)
-        .isInstanceOf[CustomWindowStreamlet[_, _]])
+        .isInstanceOf[CustomStreamlet[_, _]])
     val customStreamlet = mapStreamlet
       .getChildren()
       .get(0)
-      .asInstanceOf[CustomWindowStreamlet[Double, Double]]
+      .asInstanceOf[CustomStreamlet[Double, Double]]
     assertEquals("CustomWindow_Streamlet_1", customStreamlet.getName)
     assertEquals(7, customStreamlet.getNumPartitions)
     assertEquals(0, customStreamlet.getChildren.size())


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services