You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nl...@apache.org on 2018/11/03 03:01:12 UTC
[incubator-heron] branch master updated: Nwang/refactor custom
operator interface v2 (#3100)
This is an automated email from the ASF dual-hosted git repository.
nlu90 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 34a64f6 Nwang/refactor custom operator interface v2 (#3100)
34a64f6 is described below
commit 34a64f6eec69e22a54187d03d0fb4ad8ca2801cf
Author: Ning Wang <nw...@twitter.com>
AuthorDate: Fri Nov 2 20:01:05 2018 -0700
Nwang/refactor custom operator interface v2 (#3100)
* Reduce number of applyOperator() functions in Streamlet
* Remove CustomBasicStreamlet and CustomWindowStreamlet
---
.../heron/streamlet/IStreamletBasicOperator.java | 2 +-
.../apache/heron/streamlet/IStreamletOperator.java | 7 +--
...etOperator.java => IStreamletRichOperator.java} | 2 +-
.../heron/streamlet/IStreamletWindowOperator.java | 2 +-
.../java/org/apache/heron/streamlet/Streamlet.java | 16 -------
.../apache/heron/streamlet/impl/StreamletImpl.java | 34 +------------
.../impl/operators/StreamletOperator.java | 4 +-
.../impl/streamlets/CustomBasicStreamlet.java | 55 ----------------------
.../streamlet/impl/streamlets/CustomStreamlet.java | 29 +++++++++---
.../impl/streamlets/CustomWindowStreamlet.java | 55 ----------------------
.../apache/heron/streamlet/scala/Streamlet.scala | 18 -------
.../heron/streamlet/scala/impl/StreamletImpl.scala | 24 ----------
.../heron/streamlet/impl/StreamletImplTest.java | 18 ++++---
.../streamlet/scala/impl/StreamletImplTest.scala | 16 +++----
14 files changed, 47 insertions(+), 235 deletions(-)
diff --git a/heron/api/src/java/org/apache/heron/streamlet/IStreamletBasicOperator.java b/heron/api/src/java/org/apache/heron/streamlet/IStreamletBasicOperator.java
index e6b615a..98b87f3 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/IStreamletBasicOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/IStreamletBasicOperator.java
@@ -26,5 +26,5 @@ import org.apache.heron.api.bolt.IBasicBolt;
* The interface for streamlet operators. It can be used to create
* operators based on existing Bolts (subclasses of IBasicBolt).
*/
-public interface IStreamletBasicOperator<R, T> extends IBasicBolt {
+public interface IStreamletBasicOperator<R, T> extends IStreamletOperator<R, T>, IBasicBolt {
}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java b/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java
index 24b989c..0eecb60 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java
@@ -20,11 +20,8 @@
package org.apache.heron.streamlet;
-import org.apache.heron.api.bolt.IRichBolt;
-
/**
- * The interface for custom operators: it can be used to create
- * operators based on existing Bolts (subclasses of IRichBolt).
+ * The base interface for all Streamlet operator interfaces.
*/
-public interface IStreamletOperator<R, T> extends IRichBolt {
+public interface IStreamletOperator<R, T> {
}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java b/heron/api/src/java/org/apache/heron/streamlet/IStreamletRichOperator.java
similarity index 91%
copy from heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java
copy to heron/api/src/java/org/apache/heron/streamlet/IStreamletRichOperator.java
index 24b989c..c6800b8 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/IStreamletRichOperator.java
@@ -26,5 +26,5 @@ import org.apache.heron.api.bolt.IRichBolt;
* The interface for custom operators: it can be used to create
* operators based on existing Bolts (subclasses of IRichBolt).
*/
-public interface IStreamletOperator<R, T> extends IRichBolt {
+public interface IStreamletRichOperator<R, T> extends IStreamletOperator<R, T>, IRichBolt {
}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/IStreamletWindowOperator.java b/heron/api/src/java/org/apache/heron/streamlet/IStreamletWindowOperator.java
index 7369fca..dcccae1 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/IStreamletWindowOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/IStreamletWindowOperator.java
@@ -26,5 +26,5 @@ import org.apache.heron.api.bolt.IWindowedBolt;
* The interface for streamlet operators. It can be used to create
* operators based on existing Bolts (subclasses of IWindowedBolt).
*/
-public interface IStreamletWindowOperator<R, T> extends IWindowedBolt {
+public interface IStreamletWindowOperator<R, T> extends IStreamletOperator<R, T>, IWindowedBolt {
}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
index 2490537..b5fe931 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
@@ -210,22 +210,6 @@ public interface Streamlet<R> {
<T> Streamlet<T> applyOperator(IStreamletOperator<R, T> operator);
/**
- * Returns a new Streamlet by applying the operator on each element of this streamlet.
- * @param operator The operator to be applied
- * @param <T> The return type of the transform
- * @return Streamlet containing the output of the operation
- */
- <T> Streamlet<T> applyOperator(IStreamletBasicOperator<R, T> operator);
-
- /**
- * Returns a new Streamlet by applying the operator on each element of this streamlet.
- * @param operator The operator to be applied
- * @param <T> The return type of the transform
- * @return Streamlet containing the output of the operation
- */
- <T> Streamlet<T> applyOperator(IStreamletWindowOperator<R, T> operator);
-
- /**
* Logs every element of the streamlet using String.valueOf function
* This is one of the sink functions in the sense that this operation returns void
*/
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
index 677b34e..b1c427a 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
@@ -27,9 +27,7 @@ import java.util.Set;
import java.util.logging.Logger;
import org.apache.heron.api.topology.TopologyBuilder;
-import org.apache.heron.streamlet.IStreamletBasicOperator;
import org.apache.heron.streamlet.IStreamletOperator;
-import org.apache.heron.streamlet.IStreamletWindowOperator;
import org.apache.heron.streamlet.JoinType;
import org.apache.heron.streamlet.KeyValue;
import org.apache.heron.streamlet.KeyedWindow;
@@ -45,9 +43,7 @@ import org.apache.heron.streamlet.Source;
import org.apache.heron.streamlet.Streamlet;
import org.apache.heron.streamlet.WindowConfig;
import org.apache.heron.streamlet.impl.streamlets.ConsumerStreamlet;
-import org.apache.heron.streamlet.impl.streamlets.CustomBasicStreamlet;
import org.apache.heron.streamlet.impl.streamlets.CustomStreamlet;
-import org.apache.heron.streamlet.impl.streamlets.CustomWindowStreamlet;
import org.apache.heron.streamlet.impl.streamlets.FilterStreamlet;
import org.apache.heron.streamlet.impl.streamlets.FlatMapStreamlet;
import org.apache.heron.streamlet.impl.streamlets.GeneralReduceByKeyAndWindowStreamlet;
@@ -218,6 +214,7 @@ public abstract class StreamletImpl<R> implements Streamlet<R> {
if (built) {
throw new RuntimeException("Logic Error While building " + getName());
}
+
if (doBuild(bldr, stageNames)) {
built = true;
for (StreamletImpl<?> streamlet : children) {
@@ -500,34 +497,7 @@ public abstract class StreamletImpl<R> implements Streamlet<R> {
*/
@Override
public <T> Streamlet<T> applyOperator(IStreamletOperator<R, T> operator) {
- CustomStreamlet<R, T> customStreamlet = new CustomStreamlet<>(this, operator);
- addChild(customStreamlet);
- return customStreamlet;
- }
-
- /**
- * Returns a new Streamlet by applying the operator on each element of this streamlet.
- * @param operator The operator to be applied
- * @param <T> The return type of the transform
- * @return Streamlet containing the output of the operation
- */
- @Override
- public <T> Streamlet<T> applyOperator(IStreamletBasicOperator<R, T> operator) {
- CustomBasicStreamlet<R, T> customStreamlet = new CustomBasicStreamlet<>(this, operator);
- addChild(customStreamlet);
- return customStreamlet;
- }
-
- /**
- * Returns a new Streamlet by applying the operator on each element of this streamlet.
- * @param operator The operator to be applied
- * @param <T> The return type of the transform
- * @return Streamlet containing the output of the operation
- */
- @Override
- public <T> Streamlet<T> applyOperator(IStreamletWindowOperator<R, T> operator) {
- CustomWindowStreamlet<R, T> customStreamlet =
- new CustomWindowStreamlet<>(this, operator);
+ StreamletImpl<T> customStreamlet = new CustomStreamlet<>(this, operator);
addChild(customStreamlet);
return customStreamlet;
}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
index e2b3df6..75e9b7f 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
@@ -23,7 +23,7 @@ package org.apache.heron.streamlet.impl.operators;
import org.apache.heron.api.bolt.BaseRichBolt;
import org.apache.heron.api.topology.OutputFieldsDeclarer;
import org.apache.heron.api.tuple.Fields;
-import org.apache.heron.streamlet.IStreamletOperator;
+import org.apache.heron.streamlet.IStreamletRichOperator;
/**
* The Bolt interface that other operators of the streamlet packages extend.
@@ -31,7 +31,7 @@ import org.apache.heron.streamlet.IStreamletOperator;
*/
public abstract class StreamletOperator<R, T>
extends BaseRichBolt
- implements IStreamletOperator<R, T> {
+ implements IStreamletRichOperator<R, T> {
private static final long serialVersionUID = 8524238140745238942L;
private static final String OUTPUT_FIELD_NAME = "output";
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomBasicStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomBasicStreamlet.java
deleted file mode 100644
index 7d4ae37..0000000
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomBasicStreamlet.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-package org.apache.heron.streamlet.impl.streamlets;
-
-import java.util.Set;
-
-import org.apache.heron.api.topology.TopologyBuilder;
-import org.apache.heron.streamlet.IStreamletBasicOperator;
-import org.apache.heron.streamlet.impl.StreamletImpl;
-
-/**
- * CustomBasicStreamlet represents a Streamlet that is made up of applying the user
- * supplied custom operator to each element of the parent streamlet.
- */
-public class CustomBasicStreamlet<R, T> extends StreamletImpl<T> {
- private StreamletImpl<R> parent;
- private IStreamletBasicOperator<R, T> op;
-
- /**
- * Create a custom streamlet from user defined CustomBasicOperator object.
- * @param parent The parent(upstream) streamlet object
- * @param op The user defined CustomeBasicOperator
- */
- public CustomBasicStreamlet(StreamletImpl<R> parent,
- IStreamletBasicOperator<R, T> op) {
- this.parent = parent;
- this.op = op;
- setNumPartitions(parent.getNumPartitions());
- }
-
- @Override
- public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
- setDefaultNameIfNone(StreamletNamePrefix.CUSTOM_BASIC, stageNames);
- bldr.setBolt(getName(), op, getNumPartitions()).shuffleGrouping(parent.getName());
- return true;
- }
-}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
index 25258d2..b0f71c0 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
@@ -23,7 +23,10 @@ package org.apache.heron.streamlet.impl.streamlets;
import java.util.Set;
import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.IStreamletBasicOperator;
import org.apache.heron.streamlet.IStreamletOperator;
+import org.apache.heron.streamlet.IStreamletRichOperator;
+import org.apache.heron.streamlet.IStreamletWindowOperator;
import org.apache.heron.streamlet.impl.StreamletImpl;
/**
@@ -32,24 +35,38 @@ import org.apache.heron.streamlet.impl.StreamletImpl;
*/
public class CustomStreamlet<R, T> extends StreamletImpl<T> {
private StreamletImpl<R> parent;
- private IStreamletOperator<R, T> op;
+ private IStreamletOperator<R, T> operator;
/**
* Create a custom streamlet from user defined CustomOperator object.
* @param parent The parent(upstream) streamlet object
- * @param op The user defined CustomeOperator
+ * @param operator The user defined CustomeOperator
*/
public CustomStreamlet(StreamletImpl<R> parent,
- IStreamletOperator<R, T> op) {
+ IStreamletOperator<R, T> operator) {
this.parent = parent;
- this.op = op;
+ this.operator = operator;
setNumPartitions(parent.getNumPartitions());
}
@Override
public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
- setDefaultNameIfNone(StreamletNamePrefix.CUSTOM, stageNames);
- bldr.setBolt(getName(), op, getNumPartitions()).shuffleGrouping(parent.getName());
+ if (operator instanceof IStreamletBasicOperator) {
+ setDefaultNameIfNone(StreamletNamePrefix.CUSTOM, stageNames);
+ IStreamletBasicOperator<R, T> op = (IStreamletBasicOperator<R, T>) operator;
+ bldr.setBolt(getName(), op, getNumPartitions()).shuffleGrouping(parent.getName());
+ } else if (operator instanceof IStreamletRichOperator) {
+ setDefaultNameIfNone(StreamletNamePrefix.CUSTOM_BASIC, stageNames);
+ IStreamletRichOperator<R, T> op = (IStreamletRichOperator<R, T>) operator;
+ bldr.setBolt(getName(), op, getNumPartitions()).shuffleGrouping(parent.getName());
+ } else if (operator instanceof IStreamletWindowOperator) {
+ setDefaultNameIfNone(StreamletNamePrefix.CUSTOM_WINDOW, stageNames);
+ IStreamletWindowOperator<R, T> op = (IStreamletWindowOperator<R, T>) operator;
+ bldr.setBolt(getName(), op, getNumPartitions()).shuffleGrouping(parent.getName());
+ } else {
+ throw new RuntimeException("Unhandled operator class is found!");
+ }
+
return true;
}
}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomWindowStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomWindowStreamlet.java
deleted file mode 100644
index 8f05aa1..0000000
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomWindowStreamlet.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-package org.apache.heron.streamlet.impl.streamlets;
-
-import java.util.Set;
-
-import org.apache.heron.api.topology.TopologyBuilder;
-import org.apache.heron.streamlet.IStreamletWindowOperator;
-import org.apache.heron.streamlet.impl.StreamletImpl;
-
-/**
- * CustomWindowOperator represents a Streamlet that is made up of applying the user
- * supplied custom operator to each element of the parent streamlet.
- */
-public class CustomWindowStreamlet<R, T> extends StreamletImpl<T> {
- private StreamletImpl<R> parent;
- private IStreamletWindowOperator<R, T> op;
-
- /**
- * Create a custom streamlet from user defined CustomWindowOperator object.
- * @param parent The parent(upstream) streamlet object
- * @param op The user defined CustomeWindowOperator
- */
- public CustomWindowStreamlet(StreamletImpl<R> parent,
- IStreamletWindowOperator<R, T> op) {
- this.parent = parent;
- this.op = op;
- setNumPartitions(parent.getNumPartitions());
- }
-
- @Override
- public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
- setDefaultNameIfNone(StreamletNamePrefix.CUSTOM_WINDOW, stageNames);
- bldr.setBolt(getName(), op, getNumPartitions()).shuffleGrouping(parent.getName());
- return true;
- }
-}
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
index b2dc4ca..5a11c75 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
@@ -19,9 +19,7 @@
package org.apache.heron.streamlet.scala
import org.apache.heron.streamlet.{
- IStreamletBasicOperator,
IStreamletOperator,
- IStreamletWindowOperator,
JoinType,
KeyValue,
KeyedWindow,
@@ -234,22 +232,6 @@ trait Streamlet[R] {
def applyOperator[T](operator: IStreamletOperator[R, T]): Streamlet[T]
/**
- * Returns a new Streamlet by applying the operator on each element of this streamlet.
- * @param operator The operator to be applied
- * @param <T> The return type of the transform
- * @return Streamlet containing the output of the operation
- */
- def applyOperator[T](operator: IStreamletBasicOperator[R, T]): Streamlet[T]
-
- /**
- * Returns a new Streamlet by applying the operator on each element of this streamlet.
- * @param operator The operator to be applied
- * @param <T> The return type of the transform
- * @return Streamlet containing the output of the operation
- */
- def applyOperator[T](operator: IStreamletWindowOperator[R, T]): Streamlet[T]
-
- /**
* Logs every element of the streamlet using String.valueOf function
* This is one of the sink functions in the sense that this operation returns void
*/
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
index bb1b28a..44b6ede 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
@@ -21,9 +21,7 @@ package org.apache.heron.streamlet.scala.impl
import scala.collection.JavaConverters
import org.apache.heron.streamlet.{
- IStreamletBasicOperator,
IStreamletOperator,
- IStreamletWindowOperator,
JoinType,
KeyValue,
KeyedWindow,
@@ -333,28 +331,6 @@ class StreamletImpl[R](val javaStreamlet: JavaStreamlet[R])
}
/**
- * Returns a new Streamlet by applying the operator on each element of this streamlet.
- * @param operator The operator to be applied
- * @param <T> The return type of the transform
- * @return Streamlet containing the output of the operation
- */
- override def applyOperator[T](operator: IStreamletBasicOperator[R, T]): Streamlet[T] = {
- val newJavaStreamlet = javaStreamlet.applyOperator[T](operator)
- fromJavaStreamlet(newJavaStreamlet)
- }
-
- /**
- * Returns a new Streamlet by applying the operator on each element of this streamlet.
- * @param operator The operator to be applied
- * @param <T> The return type of the transform
- * @return Streamlet containing the output of the operation
- */
- override def applyOperator[T](operator: IStreamletWindowOperator[R, T]): Streamlet[T] = {
- val newJavaStreamlet = javaStreamlet.applyOperator[T](operator)
- fromJavaStreamlet(newJavaStreamlet)
- }
-
- /**
* Logs every element of the streamlet using String.valueOf function
* This is one of the sink functions in the sense that this operation returns void
*/
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
index 87af426..5fd12e0 100644
--- a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
+++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
@@ -36,16 +36,14 @@ import org.apache.heron.resource.TestWindowBolt;
import org.apache.heron.streamlet.Config;
import org.apache.heron.streamlet.Context;
import org.apache.heron.streamlet.IStreamletBasicOperator;
-import org.apache.heron.streamlet.IStreamletOperator;
+import org.apache.heron.streamlet.IStreamletRichOperator;
import org.apache.heron.streamlet.IStreamletWindowOperator;
import org.apache.heron.streamlet.SerializableConsumer;
import org.apache.heron.streamlet.SerializableTransformer;
import org.apache.heron.streamlet.Streamlet;
import org.apache.heron.streamlet.WindowConfig;
import org.apache.heron.streamlet.impl.streamlets.ConsumerStreamlet;
-import org.apache.heron.streamlet.impl.streamlets.CustomBasicStreamlet;
import org.apache.heron.streamlet.impl.streamlets.CustomStreamlet;
-import org.apache.heron.streamlet.impl.streamlets.CustomWindowStreamlet;
import org.apache.heron.streamlet.impl.streamlets.FilterStreamlet;
import org.apache.heron.streamlet.impl.streamlets.FlatMapStreamlet;
import org.apache.heron.streamlet.impl.streamlets.JoinStreamlet;
@@ -197,7 +195,7 @@ public class StreamletImplTest {
assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
}
- private class MyBoltOperator extends TestBolt implements IStreamletOperator<Double, Double> {
+ private class MyBoltOperator extends TestBolt implements IStreamletRichOperator<Double, Double> {
}
@Test
@@ -224,9 +222,9 @@ public class StreamletImplTest {
Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20)
.applyOperator(new MyBasicBoltOperator());
- assertTrue(streamlet instanceof CustomBasicStreamlet);
- CustomBasicStreamlet<Double, Double> mStreamlet =
- (CustomBasicStreamlet<Double, Double>) streamlet;
+ assertTrue(streamlet instanceof CustomStreamlet);
+ CustomStreamlet<Double, Double> mStreamlet =
+ (CustomStreamlet<Double, Double>) streamlet;
assertEquals(20, mStreamlet.getNumPartitions());
SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
assertEquals(supplierStreamlet.getChildren().size(), 1);
@@ -243,9 +241,9 @@ public class StreamletImplTest {
Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20)
.applyOperator(new MyWindowBoltOperator());
- assertTrue(streamlet instanceof CustomWindowStreamlet);
- CustomWindowStreamlet<Double, Double> mStreamlet =
- (CustomWindowStreamlet<Double, Double>) streamlet;
+ assertTrue(streamlet instanceof CustomStreamlet);
+ CustomStreamlet<Double, Double> mStreamlet =
+ (CustomStreamlet<Double, Double>) streamlet;
assertEquals(20, mStreamlet.getNumPartitions());
SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
assertEquals(supplierStreamlet.getChildren().size(), 1);
diff --git a/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala b/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala
index bdaf751..b843732 100644
--- a/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala
+++ b/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala
@@ -29,15 +29,13 @@ import org.apache.heron.resource.{
}
import org.apache.heron.streamlet.{
IStreamletBasicOperator,
- IStreamletOperator,
+ IStreamletRichOperator,
IStreamletWindowOperator,
WindowConfig
}
import org.apache.heron.streamlet.impl.streamlets.{
ConsumerStreamlet,
- CustomBasicStreamlet,
CustomStreamlet,
- CustomWindowStreamlet,
FilterStreamlet,
FlatMapStreamlet,
LogStreamlet,
@@ -407,10 +405,10 @@ class StreamletImplTest extends BaseFunSuite {
assertEquals(0, transformStreamlet.getChildren.size())
}
- private class MyBoltOperator extends TestBolt with IStreamletOperator[Double, Double] {
+ private class MyBoltOperator extends TestBolt with IStreamletRichOperator[Double, Double] {
}
- test("StreamletImpl should support applyOperator operation on IStreamletOperator") {
+ test("StreamletImpl should support applyOperator operation on IStreamletRichOperator") {
val testOperator = new MyBoltOperator()
val supplierStreamlet = builder
@@ -495,11 +493,11 @@ class StreamletImplTest extends BaseFunSuite {
mapStreamlet
.getChildren()
.get(0)
- .isInstanceOf[CustomBasicStreamlet[_, _]])
+ .isInstanceOf[CustomStreamlet[_, _]])
val customStreamlet = mapStreamlet
.getChildren()
.get(0)
- .asInstanceOf[CustomBasicStreamlet[Double, Double]]
+ .asInstanceOf[CustomStreamlet[Double, Double]]
assertEquals("CustomBasic_Streamlet_1", customStreamlet.getName)
assertEquals(7, customStreamlet.getNumPartitions)
assertEquals(0, customStreamlet.getChildren.size())
@@ -544,11 +542,11 @@ class StreamletImplTest extends BaseFunSuite {
mapStreamlet
.getChildren()
.get(0)
- .isInstanceOf[CustomWindowStreamlet[_, _]])
+ .isInstanceOf[CustomStreamlet[_, _]])
val customStreamlet = mapStreamlet
.getChildren()
.get(0)
- .asInstanceOf[CustomWindowStreamlet[Double, Double]]
+ .asInstanceOf[CustomStreamlet[Double, Double]]
assertEquals("CustomWindow_Streamlet_1", customStreamlet.getName)
assertEquals(7, customStreamlet.getNumPartitions)
assertEquals(0, customStreamlet.getChildren.size())