You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nl...@apache.org on 2018/10/24 18:28:05 UTC
[incubator-heron] branch master updated: Add interfaces IStreamletOperator, IStreamletBasicOperator and IStrea… (#3051)
This is an automated email from the ASF dual-hosted git repository.
nlu90 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new e292f02 Add interfaces IStreamletOperator, IStreamletBasicOperator and IStrea… (#3051)
e292f02 is described below
commit e292f028411ab8eed803b40cc49fa0a05bcbc79f
Author: Ning Wang <nw...@twitter.com>
AuthorDate: Wed Oct 24 11:27:57 2018 -0700
Add interfaces IStreamletOperator, IStreamletBasicOperator and IStrea… (#3051)
* Add interfaces IStreamletOperator, IStreamletBasicOperator and IStreamletWindowOperator and support them in Streamlet
* Fix HiddenField
---
.../heron/streamlet/IStreamletBasicOperator.java | 30 ++++
.../apache/heron/streamlet/IStreamletOperator.java | 30 ++++
.../heron/streamlet/IStreamletWindowOperator.java | 30 ++++
.../java/org/apache/heron/streamlet/Streamlet.java | 26 +++-
.../apache/heron/streamlet/impl/StreamletImpl.java | 49 +++++++
.../streamlet/impl/operators/FilterOperator.java | 2 +-
.../streamlet/impl/operators/FlatMapOperator.java | 2 +-
.../GeneralReduceByKeyAndWindowOperator.java | 2 +-
.../streamlet/impl/operators/JoinOperator.java | 2 +-
.../streamlet/impl/operators/MapOperator.java | 2 +-
.../operators/ReduceByKeyAndWindowOperator.java | 2 +-
.../impl/operators/StreamletOperator.java | 5 +-
.../impl/operators/StreamletWindowOperator.java | 5 +-
.../impl/operators/TransformOperator.java | 2 +-
.../streamlet/impl/operators/UnionOperator.java | 2 +-
.../heron/streamlet/impl/sinks/ComplexSink.java | 2 +-
.../heron/streamlet/impl/sinks/ConsumerSink.java | 2 +-
.../apache/heron/streamlet/impl/sinks/LogSink.java | 2 +-
.../impl/streamlets/CustomBasicStreamlet.java | 55 +++++++
.../streamlet/impl/streamlets/CustomStreamlet.java | 55 +++++++
.../impl/streamlets/CustomWindowStreamlet.java | 55 +++++++
.../apache/heron/streamlet/scala/Streamlet.scala | 27 ++++
.../heron/streamlet/scala/impl/StreamletImpl.scala | 36 +++++
.../org/apache/heron/resource/TestBasicBolt.java} | 38 ++---
.../java/org/apache/heron/resource/TestBolt.java} | 41 +++---
.../org/apache/heron/resource/TestWindowBolt.java} | 42 +++---
.../heron/streamlet/impl/StreamletImplTest.java | 64 ++++++++
heron/api/tests/scala/BUILD | 5 +-
.../org/apache/heron/resource/TestBasicBolt.scala | 48 ++++++
.../scala/org/apache/heron/resource/TestBolt.scala | 53 +++++++
.../org/apache/heron/resource/TestWindowBolt.scala | 52 +++++++
.../streamlet/scala/impl/StreamletImplTest.scala | 162 ++++++++++++++++++++-
32 files changed, 858 insertions(+), 72 deletions(-)
diff --git a/heron/api/src/java/org/apache/heron/streamlet/IStreamletBasicOperator.java b/heron/api/src/java/org/apache/heron/streamlet/IStreamletBasicOperator.java
new file mode 100644
index 0000000..e6b615a
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/IStreamletBasicOperator.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.heron.streamlet;
+
+import org.apache.heron.api.bolt.IBasicBolt;
+
+/**
+ * The interface for streamlet operators. It can be used to create
+ * operators based on existing Bolts (subclasses of IBasicBolt).
+ */
+public interface IStreamletBasicOperator<R, T> extends IBasicBolt {
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java b/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java
new file mode 100644
index 0000000..24b989c
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.heron.streamlet;
+
+import org.apache.heron.api.bolt.IRichBolt;
+
+/**
+ * The interface for custom operators: it can be used to create
+ * operators based on existing Bolts (subclasses of IRichBolt).
+ */
+public interface IStreamletOperator<R, T> extends IRichBolt {
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/IStreamletWindowOperator.java b/heron/api/src/java/org/apache/heron/streamlet/IStreamletWindowOperator.java
new file mode 100644
index 0000000..7369fca
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/IStreamletWindowOperator.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.heron.streamlet;
+
+import org.apache.heron.api.bolt.IWindowedBolt;
+
+/**
+ * The interface for streamlet operators. It can be used to create
+ * operators based on existing Bolts (subclasses of IWindowedBolt).
+ */
+public interface IStreamletWindowOperator<R, T> extends IWindowedBolt {
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
index 82dbef5..2490537 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
@@ -191,7 +191,7 @@ public interface Streamlet<R> {
Streamlet<R> union(Streamlet<? extends R> other);
/**
- * Returns a new Streamlet by applying the transformFunction on each element of this streamlet.
+ * Returns a new Streamlet by applying the transformFunction on each element of this streamlet.
* Before starting to cycle the transformFunction over the Streamlet, the open function is called.
* This allows the transform Function to do any kind of initialization/loading, etc.
* @param serializableTransformer The transformation function to be applied
@@ -202,6 +202,30 @@ public interface Streamlet<R> {
SerializableTransformer<R, ? extends T> serializableTransformer);
/**
+ * Returns a new Streamlet by applying the operator on each element of this streamlet.
+ * @param operator The operator to be applied
+ * @param <T> The return type of the transform
+ * @return Streamlet containing the output of the operation
+ */
+ <T> Streamlet<T> applyOperator(IStreamletOperator<R, T> operator);
+
+ /**
+ * Returns a new Streamlet by applying the operator on each element of this streamlet.
+ * @param operator The operator to be applied
+ * @param <T> The return type of the transform
+ * @return Streamlet containing the output of the operation
+ */
+ <T> Streamlet<T> applyOperator(IStreamletBasicOperator<R, T> operator);
+
+ /**
+ * Returns a new Streamlet by applying the operator on each element of this streamlet.
+ * @param operator The operator to be applied
+ * @param <T> The return type of the transform
+ * @return Streamlet containing the output of the operation
+ */
+ <T> Streamlet<T> applyOperator(IStreamletWindowOperator<R, T> operator);
+
+ /**
* Logs every element of the streamlet using String.valueOf function
* This is one of the sink functions in the sense that this operation returns void
*/
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
index fb5b950..677b34e 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
@@ -27,6 +27,9 @@ import java.util.Set;
import java.util.logging.Logger;
import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.IStreamletBasicOperator;
+import org.apache.heron.streamlet.IStreamletOperator;
+import org.apache.heron.streamlet.IStreamletWindowOperator;
import org.apache.heron.streamlet.JoinType;
import org.apache.heron.streamlet.KeyValue;
import org.apache.heron.streamlet.KeyedWindow;
@@ -42,6 +45,9 @@ import org.apache.heron.streamlet.Source;
import org.apache.heron.streamlet.Streamlet;
import org.apache.heron.streamlet.WindowConfig;
import org.apache.heron.streamlet.impl.streamlets.ConsumerStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.CustomBasicStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.CustomStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.CustomWindowStreamlet;
import org.apache.heron.streamlet.impl.streamlets.FilterStreamlet;
import org.apache.heron.streamlet.impl.streamlets.FlatMapStreamlet;
import org.apache.heron.streamlet.impl.streamlets.GeneralReduceByKeyAndWindowStreamlet;
@@ -100,6 +106,9 @@ public abstract class StreamletImpl<R> implements Streamlet<R> {
protected enum StreamletNamePrefix {
CONSUMER("consumer"),
+ CUSTOM("custom"),
+ CUSTOM_BASIC("customBasic"),
+ CUSTOM_WINDOW("customWindow"),
FILTER("filter"),
FLATMAP("flatmap"),
REDUCE("reduceByKeyAndWindow"),
@@ -484,6 +493,46 @@ public abstract class StreamletImpl<R> implements Streamlet<R> {
}
/**
+ * Returns a new Streamlet by applying the operator on each element of this streamlet.
+ * @param operator The operator to be applied
+ * @param <T> The return type of the transform
+ * @return Streamlet containing the output of the operation
+ */
+ @Override
+ public <T> Streamlet<T> applyOperator(IStreamletOperator<R, T> operator) {
+ CustomStreamlet<R, T> customStreamlet = new CustomStreamlet<>(this, operator);
+ addChild(customStreamlet);
+ return customStreamlet;
+ }
+
+ /**
+ * Returns a new Streamlet by applying the operator on each element of this streamlet.
+ * @param operator The operator to be applied
+ * @param <T> The return type of the transform
+ * @return Streamlet containing the output of the operation
+ */
+ @Override
+ public <T> Streamlet<T> applyOperator(IStreamletBasicOperator<R, T> operator) {
+ CustomBasicStreamlet<R, T> customStreamlet = new CustomBasicStreamlet<>(this, operator);
+ addChild(customStreamlet);
+ return customStreamlet;
+ }
+
+ /**
+ * Returns a new Streamlet by applying the operator on each element of this streamlet.
+ * @param operator The operator to be applied
+ * @param <T> The return type of the transform
+ * @return Streamlet containing the output of the operation
+ */
+ @Override
+ public <T> Streamlet<T> applyOperator(IStreamletWindowOperator<R, T> operator) {
+ CustomWindowStreamlet<R, T> customStreamlet =
+ new CustomWindowStreamlet<>(this, operator);
+ addChild(customStreamlet);
+ return customStreamlet;
+ }
+
+ /**
* Verifies the requirement as the utility function.
* @param requirement The requirement to verify
* @param errorMessage The error message
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FilterOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FilterOperator.java
index 9b42ee6..af9afe8 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FilterOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FilterOperator.java
@@ -34,7 +34,7 @@ import org.apache.heron.streamlet.SerializablePredicate;
* For every tuple that it encounters, the filter function is run
* and the tuple is re-emitted if the predicate evaluates to true
*/
-public class FilterOperator<R> extends StreamletOperator {
+public class FilterOperator<R> extends StreamletOperator<R, R> {
private static final long serialVersionUID = -4748646871471052706L;
private SerializablePredicate<? super R> filterFn;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FlatMapOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FlatMapOperator.java
index 4ba3a7f..35be0cf 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FlatMapOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FlatMapOperator.java
@@ -34,7 +34,7 @@ import org.apache.heron.streamlet.SerializableFunction;
* For every tuple, it applies the flatMapFunction, flattens the resulting
* tuples and emits them
*/
-public class FlatMapOperator<R, T> extends StreamletOperator {
+public class FlatMapOperator<R, T> extends StreamletOperator<R, T> {
private static final long serialVersionUID = -2418329215159618998L;
private SerializableFunction<? super R, ? extends Iterable<? extends T>> flatMapFn;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java
index f37ebeb..7b9f71f 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java
@@ -40,7 +40,7 @@ import org.apache.heron.streamlet.Window;
* For every time window, the bolt goes over all the tuples in that window and applies the reduce
* function grouped by keys. It emits a KeyedWindow, reduced Value KeyPairs as outputs
*/
-public class GeneralReduceByKeyAndWindowOperator<K, V, VR> extends StreamletWindowOperator {
+public class GeneralReduceByKeyAndWindowOperator<K, V, VR> extends StreamletWindowOperator<V, V> {
private static final long serialVersionUID = 2833576046687752396L;
private SerializableFunction<V, K> keyExtractor;
private VR identity;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/JoinOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/JoinOperator.java
index 1aa3138..00c8d80 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/JoinOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/JoinOperator.java
@@ -44,7 +44,7 @@ import org.apache.heron.streamlet.Window;
* a config parameter. Also taken as parameters are which source is left and right.
* This is needed for the semantics of outer/left/inner joins.
*/
-public class JoinOperator<K, V1, V2, VR> extends StreamletWindowOperator {
+public class JoinOperator<K, V1, V2, VR> extends StreamletWindowOperator<V1, VR> {
private static final long serialVersionUID = 4875450390444745407L;
private static final String LEFT_COMPONENT_NAME = "_streamlet_joinbolt_left_component_name_";
private static final String RIGHT_COMPONENT_NAME = "_streamlet_joinbolt_right_component_name_";
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/MapOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/MapOperator.java
index b05e6f1..7ee9be1 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/MapOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/MapOperator.java
@@ -33,7 +33,7 @@ import org.apache.heron.streamlet.SerializableFunction;
* It takes in the mapFunction Function as the input.
* For every tuple, it applies the mapFunction, and emits the resulting value
*/
-public class MapOperator<R, T> extends StreamletOperator {
+public class MapOperator<R, T> extends StreamletOperator<R, T> {
private static final long serialVersionUID = -1303096133107278700L;
private SerializableFunction<? super R, ? extends T> mapFn;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperator.java
index cf6dc38..b0155df 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperator.java
@@ -40,7 +40,7 @@ import org.apache.heron.streamlet.Window;
* For every time window, the bolt goes over all the tuples in that window and applies the reduce
* function grouped by keys. It emits a KeyedWindow, reduced Value KeyPairs as outputs
*/
-public class ReduceByKeyAndWindowOperator<K, V, R> extends StreamletWindowOperator {
+public class ReduceByKeyAndWindowOperator<K, V, R> extends StreamletWindowOperator<R, V> {
private static final long serialVersionUID = 2833576046687750496L;
private SerializableFunction<R, K> keyExtractor;
private SerializableFunction<R, V> valueExtractor;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
index 27e6cd0..e2b3df6 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
@@ -23,12 +23,15 @@ package org.apache.heron.streamlet.impl.operators;
import org.apache.heron.api.bolt.BaseRichBolt;
import org.apache.heron.api.topology.OutputFieldsDeclarer;
import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.streamlet.IStreamletOperator;
/**
* The Bolt interface that other operators of the streamlet packages extend.
* The only common stuff amongst all of them is the output streams
*/
-public abstract class StreamletOperator extends BaseRichBolt {
+public abstract class StreamletOperator<R, T>
+ extends BaseRichBolt
+ implements IStreamletOperator<R, T> {
private static final long serialVersionUID = 8524238140745238942L;
private static final String OUTPUT_FIELD_NAME = "output";
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletWindowOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletWindowOperator.java
index e9ff1ba..f90655e 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletWindowOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletWindowOperator.java
@@ -23,12 +23,15 @@ package org.apache.heron.streamlet.impl.operators;
import org.apache.heron.api.bolt.BaseWindowedBolt;
import org.apache.heron.api.topology.OutputFieldsDeclarer;
import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.streamlet.IStreamletWindowOperator;
/**
* The Bolt interface that other windowed operators of the streamlet packages extend.
* The only common stuff amongst all of them is the output streams
*/
-public abstract class StreamletWindowOperator extends BaseWindowedBolt {
+public abstract class StreamletWindowOperator<R, T>
+ extends BaseWindowedBolt
+ implements IStreamletWindowOperator<R, T> {
private static final long serialVersionUID = -4836560876041237959L;
private static final String OUTPUT_FIELD_NAME = "output";
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java
index e588f8b..0bf5232 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java
@@ -39,7 +39,7 @@ import org.apache.heron.streamlet.impl.ContextImpl;
* It calls the transformFunction setup/cleanup at the beginning/end of the
* processing. And for every tuple, it applies the transformFunction, and emits the resulting value
*/
-public class TransformOperator<R, T> extends StreamletOperator
+public class TransformOperator<R, T> extends StreamletOperator<R, T>
implements IStatefulComponent<Serializable, Serializable> {
private static final long serialVersionUID = 429297144878185182L;
private SerializableTransformer<? super R, ? extends T> serializableTransformer;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java
index f061eae..151c0b4 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java
@@ -31,7 +31,7 @@ import org.apache.heron.api.tuple.Values;
* UnionOperator is the class that implements the union functionality.
* Its a very simple bolt that re-emits every tuple that it sees.
*/
-public class UnionOperator<I> extends StreamletOperator {
+public class UnionOperator<I> extends StreamletOperator<I, I> {
private static final long serialVersionUID = -7326832064961413315L;
private OutputCollector collector;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ComplexSink.java b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ComplexSink.java
index ab4dbc4..943e3c5 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ComplexSink.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ComplexSink.java
@@ -37,7 +37,7 @@ import org.apache.heron.streamlet.impl.operators.StreamletOperator;
* ConsumerSink is a very simple Sink that basically invokes a user supplied
* consume function for every tuple.
*/
-public class ComplexSink<R> extends StreamletOperator
+public class ComplexSink<R> extends StreamletOperator<R, R>
implements IStatefulComponent<Serializable, Serializable> {
private static final long serialVersionUID = 8717991188885786658L;
private Sink<R> sink;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ConsumerSink.java b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ConsumerSink.java
index 687e944..af94704 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ConsumerSink.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ConsumerSink.java
@@ -32,7 +32,7 @@ import org.apache.heron.streamlet.impl.operators.StreamletOperator;
* ConsumerSink is a very simple Sink that basically invokes a user supplied
* consume function for every tuple.
*/
-public class ConsumerSink<R> extends StreamletOperator {
+public class ConsumerSink<R> extends StreamletOperator<R, R> {
private static final long serialVersionUID = 8716140142187667638L;
private SerializableConsumer<R> consumer;
private OutputCollector collector;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/LogSink.java b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/LogSink.java
index 37c36f4..7fdc3c9 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/LogSink.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/LogSink.java
@@ -32,7 +32,7 @@ import org.apache.heron.streamlet.impl.operators.StreamletOperator;
* LogSink is a very simple Bolt that implements the log functionality.
* It basically logs every tuple.
*/
-public class LogSink<R> extends StreamletOperator {
+public class LogSink<R> extends StreamletOperator<R, R> {
private static final long serialVersionUID = -6392422646613189818L;
private static final Logger LOG = Logger.getLogger(LogSink.class.getName());
private OutputCollector collector;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomBasicStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomBasicStreamlet.java
new file mode 100644
index 0000000..7d4ae37
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomBasicStreamlet.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.heron.streamlet.impl.streamlets;
+
+import java.util.Set;
+
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.IStreamletBasicOperator;
+import org.apache.heron.streamlet.impl.StreamletImpl;
+
+/**
+ * CustomBasicStreamlet represents a Streamlet that is made up of applying the user
+ * supplied custom operator to each element of the parent streamlet.
+ */
+public class CustomBasicStreamlet<R, T> extends StreamletImpl<T> {
+ private StreamletImpl<R> parent;
+ private IStreamletBasicOperator<R, T> op;
+
+ /**
+ * Create a custom streamlet from user defined CustomBasicOperator object.
+ * @param parent The parent(upstream) streamlet object
+ * @param op The user defined CustomeBasicOperator
+ */
+ public CustomBasicStreamlet(StreamletImpl<R> parent,
+ IStreamletBasicOperator<R, T> op) {
+ this.parent = parent;
+ this.op = op;
+ setNumPartitions(parent.getNumPartitions());
+ }
+
+ @Override
+ public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
+ setDefaultNameIfNone(StreamletNamePrefix.CUSTOM_BASIC, stageNames);
+ bldr.setBolt(getName(), op, getNumPartitions()).shuffleGrouping(parent.getName());
+ return true;
+ }
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
new file mode 100644
index 0000000..25258d2
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.heron.streamlet.impl.streamlets;
+
+import java.util.Set;
+
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.IStreamletOperator;
+import org.apache.heron.streamlet.impl.StreamletImpl;
+
+/**
+ * CustomStreamlet represents a Streamlet that is made up of applying the user
+ * supplied custom operator to each element of the parent streamlet.
+ */
+public class CustomStreamlet<R, T> extends StreamletImpl<T> {
+ private StreamletImpl<R> parent;
+ private IStreamletOperator<R, T> op;
+
+ /**
+ * Create a custom streamlet from user defined CustomOperator object.
+ * @param parent The parent(upstream) streamlet object
+ * @param op The user defined CustomeOperator
+ */
+ public CustomStreamlet(StreamletImpl<R> parent,
+ IStreamletOperator<R, T> op) {
+ this.parent = parent;
+ this.op = op;
+ setNumPartitions(parent.getNumPartitions());
+ }
+
+ @Override
+ public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
+ setDefaultNameIfNone(StreamletNamePrefix.CUSTOM, stageNames);
+ bldr.setBolt(getName(), op, getNumPartitions()).shuffleGrouping(parent.getName());
+ return true;
+ }
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomWindowStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomWindowStreamlet.java
new file mode 100644
index 0000000..8f05aa1
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomWindowStreamlet.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.heron.streamlet.impl.streamlets;
+
+import java.util.Set;
+
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.IStreamletWindowOperator;
+import org.apache.heron.streamlet.impl.StreamletImpl;
+
+/**
+ * CustomWindowOperator represents a Streamlet that is made up of applying the user
+ * supplied custom operator to each element of the parent streamlet.
+ */
+public class CustomWindowStreamlet<R, T> extends StreamletImpl<T> {
+ private StreamletImpl<R> parent;
+ private IStreamletWindowOperator<R, T> op;
+
+ /**
+ * Create a custom streamlet from user defined CustomWindowOperator object.
+ * @param parent The parent(upstream) streamlet object
+ * @param op The user defined CustomeWindowOperator
+ */
+ public CustomWindowStreamlet(StreamletImpl<R> parent,
+ IStreamletWindowOperator<R, T> op) {
+ this.parent = parent;
+ this.op = op;
+ setNumPartitions(parent.getNumPartitions());
+ }
+
+ @Override
+ public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
+ setDefaultNameIfNone(StreamletNamePrefix.CUSTOM_WINDOW, stageNames);
+ bldr.setBolt(getName(), op, getNumPartitions()).shuffleGrouping(parent.getName());
+ return true;
+ }
+}
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
index 88dc5c5..b2dc4ca 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
@@ -19,6 +19,9 @@
package org.apache.heron.streamlet.scala
import org.apache.heron.streamlet.{
+ IStreamletBasicOperator,
+ IStreamletOperator,
+ IStreamletWindowOperator,
JoinType,
KeyValue,
KeyedWindow,
@@ -222,6 +225,30 @@ trait Streamlet[R] {
def transform[T](
serializableTransformer: SerializableTransformer[R, _ <: T]): Streamlet[T]
+/**
+ * Returns a new Streamlet by applying the operator on each element of this streamlet.
+ * @param operator The operator to be applied
+ * @param <T> The return type of the transform
+ * @return Streamlet containing the output of the operation
+ */
+ def applyOperator[T](operator: IStreamletOperator[R, T]): Streamlet[T]
+
+ /**
+ * Returns a new Streamlet by applying the operator on each element of this streamlet.
+ * @param operator The operator to be applied
+ * @param <T> The return type of the transform
+ * @return Streamlet containing the output of the operation
+ */
+ def applyOperator[T](operator: IStreamletBasicOperator[R, T]): Streamlet[T]
+
+ /**
+ * Returns a new Streamlet by applying the operator on each element of this streamlet.
+ * @param operator The operator to be applied
+ * @param <T> The return type of the transform
+ * @return Streamlet containing the output of the operation
+ */
+ def applyOperator[T](operator: IStreamletWindowOperator[R, T]): Streamlet[T]
+
/**
* Logs every element of the streamlet using String.valueOf function
* This is one of the sink functions in the sense that this operation returns void
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
index 6d780bd..bb1b28a 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
@@ -21,6 +21,9 @@ package org.apache.heron.streamlet.scala.impl
import scala.collection.JavaConverters
import org.apache.heron.streamlet.{
+ IStreamletBasicOperator,
+ IStreamletOperator,
+ IStreamletWindowOperator,
JoinType,
KeyValue,
KeyedWindow,
@@ -319,6 +322,39 @@ class StreamletImpl[R](val javaStreamlet: JavaStreamlet[R])
}
/**
+ * Returns a new Streamlet by applying the operator on each element of this streamlet.
+ * @param operator The operator to be applied
+ * @param <T> The return type of the transform
+ * @return Streamlet containing the output of the operation
+ */
+ override def applyOperator[T](operator: IStreamletOperator[R, T]): Streamlet[T] = {
+ val newJavaStreamlet = javaStreamlet.applyOperator[T](operator)
+ fromJavaStreamlet(newJavaStreamlet)
+ }
+
+ /**
+ * Returns a new Streamlet by applying the operator on each element of this streamlet.
+ * @param operator The operator to be applied
+ * @param <T> The return type of the transform
+ * @return Streamlet containing the output of the operation
+ */
+ override def applyOperator[T](operator: IStreamletBasicOperator[R, T]): Streamlet[T] = {
+ val newJavaStreamlet = javaStreamlet.applyOperator[T](operator)
+ fromJavaStreamlet(newJavaStreamlet)
+ }
+
+ /**
+ * Returns a new Streamlet by applying the operator on each element of this streamlet.
+ * @param operator The operator to be applied
+ * @param <T> The return type of the transform
+ * @return Streamlet containing the output of the operation
+ */
+ override def applyOperator[T](operator: IStreamletWindowOperator[R, T]): Streamlet[T] = {
+ val newJavaStreamlet = javaStreamlet.applyOperator[T](operator)
+ fromJavaStreamlet(newJavaStreamlet)
+ }
+
+ /**
* Logs every element of the streamlet using String.valueOf function
* This is one of the sink functions in the sense that this operation returns void
*/
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java b/heron/api/tests/java/org/apache/heron/resource/TestBasicBolt.java
similarity index 56%
copy from heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java
copy to heron/api/tests/java/org/apache/heron/resource/TestBasicBolt.java
index f061eae..a316eb1 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java
+++ b/heron/api/tests/java/org/apache/heron/resource/TestBasicBolt.java
@@ -17,38 +17,38 @@
* under the License.
*/
-
-package org.apache.heron.streamlet.impl.operators;
+package org.apache.heron.resource;
import java.util.Map;
-import org.apache.heron.api.bolt.OutputCollector;
+import org.apache.heron.api.bolt.BaseBasicBolt;
+import org.apache.heron.api.bolt.BasicOutputCollector;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Fields;
import org.apache.heron.api.tuple.Tuple;
import org.apache.heron.api.tuple.Values;
-/**
- * UnionOperator is the class that implements the union functionality.
- * Its a very simple bolt that re-emits every tuple that it sees.
- */
-public class UnionOperator<I> extends StreamletOperator {
- private static final long serialVersionUID = -7326832064961413315L;
- private OutputCollector collector;
+public class TestBasicBolt extends BaseBasicBolt {
+ private int tupleExecuted;
- public UnionOperator() {
+ @Override
+ public void prepare(Map<String, Object> heronConf, TopologyContext context) {
+ this.tupleExecuted = 0;
}
- @SuppressWarnings("rawtypes")
@Override
- public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
- collector = outputCollector;
+ public void execute(Tuple input, BasicOutputCollector collector) {
+ tupleExecuted++;
+ collector.emit(new Values(input.getString(0)));
}
- @SuppressWarnings("unchecked")
@Override
- public void execute(Tuple tuple) {
- I obj = (I) tuple.getValue(0);
- collector.emit(new Values(obj));
- collector.ack(tuple);
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+ outputFieldsDeclarer.declare(new Fields("output"));
+ }
+
+ public int getExecuted() {
+ return tupleExecuted;
}
}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java b/heron/api/tests/java/org/apache/heron/resource/TestBolt.java
similarity index 58%
copy from heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java
copy to heron/api/tests/java/org/apache/heron/resource/TestBolt.java
index f061eae..b01ff29 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java
+++ b/heron/api/tests/java/org/apache/heron/resource/TestBolt.java
@@ -17,38 +17,43 @@
* under the License.
*/
-
-package org.apache.heron.streamlet.impl.operators;
+package org.apache.heron.resource;
import java.util.Map;
+import org.apache.heron.api.bolt.BaseRichBolt;
import org.apache.heron.api.bolt.OutputCollector;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Fields;
import org.apache.heron.api.tuple.Tuple;
import org.apache.heron.api.tuple.Values;
-/**
- * UnionOperator is the class that implements the union functionality.
- * Its a very simple bolt that re-emits every tuple that it sees.
- */
-public class UnionOperator<I> extends StreamletOperator {
- private static final long serialVersionUID = -7326832064961413315L;
- private OutputCollector collector;
+public class TestBolt extends BaseRichBolt {
+ private OutputCollector outputCollector;
+ private int tupleExecuted;
- public UnionOperator() {
+ @Override
+ public void prepare(
+ Map<String, Object> map,
+ TopologyContext topologyContext,
+ OutputCollector collector) {
+ this.outputCollector = collector;
+ this.tupleExecuted = 0;
}
- @SuppressWarnings("rawtypes")
@Override
- public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
- collector = outputCollector;
+ public void execute(Tuple tuple) {
+ tupleExecuted++;
+ outputCollector.emit(new Values(tuple.getString(0)));
}
- @SuppressWarnings("unchecked")
@Override
- public void execute(Tuple tuple) {
- I obj = (I) tuple.getValue(0);
- collector.emit(new Values(obj));
- collector.ack(tuple);
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+ outputFieldsDeclarer.declare(new Fields("output"));
+ }
+
+ public int getExecuted() {
+ return tupleExecuted;
}
}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java b/heron/api/tests/java/org/apache/heron/resource/TestWindowBolt.java
similarity index 53%
copy from heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java
copy to heron/api/tests/java/org/apache/heron/resource/TestWindowBolt.java
index f061eae..099f394 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java
+++ b/heron/api/tests/java/org/apache/heron/resource/TestWindowBolt.java
@@ -17,38 +17,42 @@
* under the License.
*/
-
-package org.apache.heron.streamlet.impl.operators;
+package org.apache.heron.resource;
import java.util.Map;
+import org.apache.heron.api.bolt.BaseWindowedBolt;
import org.apache.heron.api.bolt.OutputCollector;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
import org.apache.heron.api.topology.TopologyContext;
-import org.apache.heron.api.tuple.Tuple;
+import org.apache.heron.api.tuple.Fields;
import org.apache.heron.api.tuple.Values;
+import org.apache.heron.api.windowing.TupleWindow;
-/**
- * UnionOperator is the class that implements the union functionality.
- * Its a very simple bolt that re-emits every tuple that it sees.
- */
-public class UnionOperator<I> extends StreamletOperator {
- private static final long serialVersionUID = -7326832064961413315L;
- private OutputCollector collector;
+public class TestWindowBolt extends BaseWindowedBolt {
+ private OutputCollector outputCollector;
+ private int tupleExecuted;
- public UnionOperator() {
+ @Override
+ public void prepare(Map<String, Object> topoConf, TopologyContext context,
+ OutputCollector collector) {
+ this.outputCollector = collector;
+ this.tupleExecuted = 0;
}
- @SuppressWarnings("rawtypes")
@Override
- public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
- collector = outputCollector;
+ public void execute(TupleWindow inputWindow) {
+ tupleExecuted++;
+ outputCollector.emit(new Values(inputWindow.get().size()));
}
- @SuppressWarnings("unchecked")
@Override
- public void execute(Tuple tuple) {
- I obj = (I) tuple.getValue(0);
- collector.emit(new Values(obj));
- collector.ack(tuple);
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("count"));
+ }
+
+ public int getExecuted() {
+ return tupleExecuted;
}
}
+
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
index cec9b45..87af426 100644
--- a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
+++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
@@ -30,13 +30,22 @@ import org.junit.Test;
import org.apache.heron.api.topology.TopologyBuilder;
import org.apache.heron.common.basics.ByteAmount;
+import org.apache.heron.resource.TestBasicBolt;
+import org.apache.heron.resource.TestBolt;
+import org.apache.heron.resource.TestWindowBolt;
import org.apache.heron.streamlet.Config;
import org.apache.heron.streamlet.Context;
+import org.apache.heron.streamlet.IStreamletBasicOperator;
+import org.apache.heron.streamlet.IStreamletOperator;
+import org.apache.heron.streamlet.IStreamletWindowOperator;
import org.apache.heron.streamlet.SerializableConsumer;
import org.apache.heron.streamlet.SerializableTransformer;
import org.apache.heron.streamlet.Streamlet;
import org.apache.heron.streamlet.WindowConfig;
import org.apache.heron.streamlet.impl.streamlets.ConsumerStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.CustomBasicStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.CustomStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.CustomWindowStreamlet;
import org.apache.heron.streamlet.impl.streamlets.FilterStreamlet;
import org.apache.heron.streamlet.impl.streamlets.FlatMapStreamlet;
import org.apache.heron.streamlet.impl.streamlets.JoinStreamlet;
@@ -188,6 +197,61 @@ public class StreamletImplTest {
assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
}
+ private class MyBoltOperator extends TestBolt implements IStreamletOperator<Double, Double> {
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testCustomStreamletFromBolt() throws Exception {
+ Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+ Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20)
+ .applyOperator(new MyBoltOperator());
+ assertTrue(streamlet instanceof CustomStreamlet);
+ CustomStreamlet<Double, Double> mStreamlet = (CustomStreamlet<Double, Double>) streamlet;
+ assertEquals(20, mStreamlet.getNumPartitions());
+ SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
+ assertEquals(supplierStreamlet.getChildren().size(), 1);
+ assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
+ }
+
+ private class MyBasicBoltOperator extends TestBasicBolt
+ implements IStreamletBasicOperator<Double, Double> {
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testCustomStreamletFromBasicBolt() throws Exception {
+ Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+ Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20)
+ .applyOperator(new MyBasicBoltOperator());
+ assertTrue(streamlet instanceof CustomBasicStreamlet);
+ CustomBasicStreamlet<Double, Double> mStreamlet =
+ (CustomBasicStreamlet<Double, Double>) streamlet;
+ assertEquals(20, mStreamlet.getNumPartitions());
+ SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
+ assertEquals(supplierStreamlet.getChildren().size(), 1);
+ assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
+ }
+
+ private class MyWindowBoltOperator extends TestWindowBolt
+ implements IStreamletWindowOperator<Double, Double> {
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testCustomStreamletFromWindowBolt() throws Exception {
+ Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+ Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20)
+ .applyOperator(new MyWindowBoltOperator());
+ assertTrue(streamlet instanceof CustomWindowStreamlet);
+ CustomWindowStreamlet<Double, Double> mStreamlet =
+ (CustomWindowStreamlet<Double, Double>) streamlet;
+ assertEquals(20, mStreamlet.getNumPartitions());
+ SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
+ assertEquals(supplierStreamlet.getChildren().size(), 1);
+ assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
+ }
+
@Test
@SuppressWarnings("unchecked")
public void testSimpleBuild() throws Exception {
diff --git a/heron/api/tests/scala/BUILD b/heron/api/tests/scala/BUILD
index 61ce328..cb533f4 100644
--- a/heron/api/tests/scala/BUILD
+++ b/heron/api/tests/scala/BUILD
@@ -1,6 +1,9 @@
scala_test(
name = "api-scala-test",
- srcs = glob(["org/apache/heron/streamlet/scala/**/*.scala"]),
+ srcs = glob([
+ "org/apache/heron/streamlet/scala/**/*.scala",
+ "org/apache/heron/resource/**/*.scala"
+ ]),
deps = [
"//third_party/java:junit4",
"//heron/api/src/scala:api-scala",
diff --git a/heron/api/tests/scala/org/apache/heron/resource/TestBasicBolt.scala b/heron/api/tests/scala/org/apache/heron/resource/TestBasicBolt.scala
new file mode 100644
index 0000000..53ceac8
--- /dev/null
+++ b/heron/api/tests/scala/org/apache/heron/resource/TestBasicBolt.scala
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.resource
+
+import java.util.{Map => JMap}
+
+import org.apache.heron.api.bolt.BaseBasicBolt
+import org.apache.heron.api.bolt.BasicOutputCollector
+import org.apache.heron.api.topology.OutputFieldsDeclarer
+import org.apache.heron.api.topology.TopologyContext
+import org.apache.heron.api.tuple.Fields
+import org.apache.heron.api.tuple.Tuple
+import org.apache.heron.api.tuple.Values
+
+class TestBasicBolt extends BaseBasicBolt {
+ var tupleExecuted: Int = 0
+
+ override def prepare(heronConf: JMap[String, Object], context: TopologyContext): Unit = {
+ }
+
+ override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
+ tupleExecuted = tupleExecuted + 1
+ val v = input.getDouble(0)
+ collector.emit(new Values(v))
+ }
+
+
+ override def declareOutputFields(outputFieldsDeclarer: OutputFieldsDeclarer): Unit = {
+ outputFieldsDeclarer.declare(new Fields("output"))
+ }
+}
diff --git a/heron/api/tests/scala/org/apache/heron/resource/TestBolt.scala b/heron/api/tests/scala/org/apache/heron/resource/TestBolt.scala
new file mode 100644
index 0000000..05b0ab3
--- /dev/null
+++ b/heron/api/tests/scala/org/apache/heron/resource/TestBolt.scala
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.resource
+
+import java.util.{Map => JMap}
+
+import org.apache.heron.api.bolt.BaseRichBolt
+import org.apache.heron.api.bolt.OutputCollector
+import org.apache.heron.api.topology.OutputFieldsDeclarer
+import org.apache.heron.api.topology.TopologyContext
+import org.apache.heron.api.tuple.Fields
+import org.apache.heron.api.tuple.Tuple
+import org.apache.heron.api.tuple.Values
+
+class TestBolt extends BaseRichBolt {
+ var outputCollector: OutputCollector = _
+ var tupleExecuted: Int = 0
+
+ override def prepare(
+ conf: JMap[String, Object],
+ topologyContext: TopologyContext,
+ collector: OutputCollector): Unit = {
+ outputCollector = collector
+ }
+
+
+ override def execute(tuple: Tuple): Unit = {
+ tupleExecuted = tupleExecuted + 1
+ val v = tuple.getDouble(0)
+ outputCollector.emit(new Values(v))
+ }
+
+ override def declareOutputFields(outputFieldsDeclarer: OutputFieldsDeclarer): Unit = {
+ outputFieldsDeclarer.declare(new Fields("output"))
+ }
+}
diff --git a/heron/api/tests/scala/org/apache/heron/resource/TestWindowBolt.scala b/heron/api/tests/scala/org/apache/heron/resource/TestWindowBolt.scala
new file mode 100644
index 0000000..863b14d
--- /dev/null
+++ b/heron/api/tests/scala/org/apache/heron/resource/TestWindowBolt.scala
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.resource
+
+import java.lang.{Double => JDouble}
+import java.util.{Map => JMap}
+
+import org.apache.heron.api.bolt.BaseWindowedBolt
+import org.apache.heron.api.bolt.OutputCollector
+import org.apache.heron.api.topology.OutputFieldsDeclarer
+import org.apache.heron.api.topology.TopologyContext
+import org.apache.heron.api.tuple.Fields
+import org.apache.heron.api.tuple.Values
+import org.apache.heron.api.windowing.TupleWindow
+
+class TestWindowBolt extends BaseWindowedBolt {
+ var outputCollector: OutputCollector = _
+ var tupleExecuted: Int = 0
+
+ override def prepare(topoConf: JMap[String, Object], context: TopologyContext,
+ collector: OutputCollector): Unit = {
+ this.outputCollector = collector
+ }
+
+ override def execute(inputWindow: TupleWindow): Unit = {
+ tupleExecuted = tupleExecuted + 1
+ val size: JDouble = inputWindow.get().size()
+ outputCollector.emit(new Values(size))
+ }
+
+ override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
+ declarer.declare(new Fields("count"))
+ }
+}
+
diff --git a/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala b/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala
index 3c6d073..bdaf751 100644
--- a/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala
+++ b/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala
@@ -22,9 +22,22 @@ import scala.util.Random
import org.junit.Assert.{assertEquals, assertTrue}
-import org.apache.heron.streamlet.WindowConfig
+import org.apache.heron.resource.{
+ TestBasicBolt,
+ TestBolt,
+ TestWindowBolt
+}
+import org.apache.heron.streamlet.{
+ IStreamletBasicOperator,
+ IStreamletOperator,
+ IStreamletWindowOperator,
+ WindowConfig
+}
import org.apache.heron.streamlet.impl.streamlets.{
ConsumerStreamlet,
+ CustomBasicStreamlet,
+ CustomStreamlet,
+ CustomWindowStreamlet,
FilterStreamlet,
FlatMapStreamlet,
LogStreamlet,
@@ -394,6 +407,153 @@ class StreamletImplTest extends BaseFunSuite {
assertEquals(0, transformStreamlet.getChildren.size())
}
+ private class MyBoltOperator extends TestBolt with IStreamletOperator[Double, Double] {
+ }
+
+ test("StreamletImpl should support applyOperator operation on IStreamletOperator") {
+
+ val testOperator = new MyBoltOperator()
+ val supplierStreamlet = builder
+ .newSource(() => Random.nextDouble())
+ .setName("Supplier_Streamlet_1")
+ .setNumPartitions(3)
+
+ supplierStreamlet
+ .map[Double] { num: Double =>
+ num * 10
+ }
+ .setName("Map_Streamlet_1")
+ .setNumPartitions(2)
+ .applyOperator(testOperator)
+ .setName("Custom_Streamlet_1")
+ .setNumPartitions(7)
+
+ val supplierStreamletImpl =
+ supplierStreamlet.asInstanceOf[StreamletImpl[Double]]
+ assertEquals(1, supplierStreamletImpl.getChildren.size)
+ assertTrue(
+ supplierStreamletImpl
+ .getChildren(0)
+ .isInstanceOf[MapStreamlet[_, _]])
+ val mapStreamlet = supplierStreamletImpl
+ .getChildren(0)
+ .asInstanceOf[MapStreamlet[Double, Double]]
+ assertEquals("Map_Streamlet_1", mapStreamlet.getName)
+ assertEquals(2, mapStreamlet.getNumPartitions)
+ assertEquals(1, mapStreamlet.getChildren.size())
+
+ assertTrue(
+ mapStreamlet
+ .getChildren()
+ .get(0)
+ .isInstanceOf[CustomStreamlet[_, _]])
+ val customStreamlet = mapStreamlet
+ .getChildren()
+ .get(0)
+ .asInstanceOf[CustomStreamlet[Double, Double]]
+ assertEquals("Custom_Streamlet_1", customStreamlet.getName)
+ assertEquals(7, customStreamlet.getNumPartitions)
+ assertEquals(0, customStreamlet.getChildren.size())
+ }
+
+ private class MyBasicBoltOperator extends TestBasicBolt
+ with IStreamletBasicOperator[Double, Double] {
+ }
+
+ test("StreamletImpl should support applyOperator operation on IStreamletBasicOperator") {
+ val testOperator = new MyBasicBoltOperator()
+ val supplierStreamlet = builder
+ .newSource(() => Random.nextDouble())
+ .setName("Supplier_Streamlet_1")
+ .setNumPartitions(3)
+
+ supplierStreamlet
+ .map[Double] { num: Double =>
+ num * 10
+ }
+ .setName("Map_Streamlet_1")
+ .setNumPartitions(2)
+ .applyOperator(testOperator)
+ .setName("CustomBasic_Streamlet_1")
+ .setNumPartitions(7)
+
+ val supplierStreamletImpl =
+ supplierStreamlet.asInstanceOf[StreamletImpl[Double]]
+ assertEquals(1, supplierStreamletImpl.getChildren.size)
+ assertTrue(
+ supplierStreamletImpl
+ .getChildren(0)
+ .isInstanceOf[MapStreamlet[_, _]])
+ val mapStreamlet = supplierStreamletImpl
+ .getChildren(0)
+ .asInstanceOf[MapStreamlet[Double, Double]]
+ assertEquals("Map_Streamlet_1", mapStreamlet.getName)
+ assertEquals(2, mapStreamlet.getNumPartitions)
+ assertEquals(1, mapStreamlet.getChildren.size())
+
+ assertTrue(
+ mapStreamlet
+ .getChildren()
+ .get(0)
+ .isInstanceOf[CustomBasicStreamlet[_, _]])
+ val customStreamlet = mapStreamlet
+ .getChildren()
+ .get(0)
+ .asInstanceOf[CustomBasicStreamlet[Double, Double]]
+ assertEquals("CustomBasic_Streamlet_1", customStreamlet.getName)
+ assertEquals(7, customStreamlet.getNumPartitions)
+ assertEquals(0, customStreamlet.getChildren.size())
+ }
+
+ private class MyWindowBoltOperator extends TestWindowBolt
+ with IStreamletWindowOperator[Double, Double] {
+ }
+
+ test("StreamletImpl should support applyOperator operation on IStreamletWindowOperator") {
+ val testOperator = new MyWindowBoltOperator()
+ val supplierStreamlet = builder
+ .newSource(() => Random.nextDouble())
+ .setName("Supplier_Streamlet_1")
+ .setNumPartitions(3)
+
+ supplierStreamlet
+ .map[Double] { num: Double =>
+ num * 10
+ }
+ .setName("Map_Streamlet_1")
+ .setNumPartitions(2)
+ .applyOperator(testOperator)
+ .setName("CustomWindow_Streamlet_1")
+ .setNumPartitions(7)
+
+ val supplierStreamletImpl =
+ supplierStreamlet.asInstanceOf[StreamletImpl[Double]]
+ assertEquals(1, supplierStreamletImpl.getChildren.size)
+ assertTrue(
+ supplierStreamletImpl
+ .getChildren(0)
+ .isInstanceOf[MapStreamlet[_, _]])
+ val mapStreamlet = supplierStreamletImpl
+ .getChildren(0)
+ .asInstanceOf[MapStreamlet[Double, Double]]
+ assertEquals("Map_Streamlet_1", mapStreamlet.getName)
+ assertEquals(2, mapStreamlet.getNumPartitions)
+ assertEquals(1, mapStreamlet.getChildren.size())
+
+ assertTrue(
+ mapStreamlet
+ .getChildren()
+ .get(0)
+ .isInstanceOf[CustomWindowStreamlet[_, _]])
+ val customStreamlet = mapStreamlet
+ .getChildren()
+ .get(0)
+ .asInstanceOf[CustomWindowStreamlet[Double, Double]]
+ assertEquals("CustomWindow_Streamlet_1", customStreamlet.getName)
+ assertEquals(7, customStreamlet.getNumPartitions)
+ assertEquals(0, customStreamlet.getChildren.size())
+ }
+
test("StreamletImpl should support reduce operation") {
val supplierStreamlet = builder
.newSource(() => Random.nextInt(10))