You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nw...@apache.org on 2018/11/19 20:56:13 UTC
[incubator-heron] branch master updated: [Java Streamlet API]
Support abstractions on Streamlet Operators (#3112)
This is an automated email from the ASF dual-hosted git repository.
nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 8f44c11 [Java Streamlet API] Support abstractions on Streamlet Operators (#3112)
8f44c11 is described below
commit 8f44c118b0c5cbe94f4e2ec21fb75a04dc8a7f32
Author: Eren Avsarogullari <er...@gmail.com>
AuthorDate: Mon Nov 19 20:56:08 2018 +0000
[Java Streamlet API] Support abstractions on Streamlet Operators (#3112)
* [Java Streamlet API] Support Abstractions on Streamlet Operators
---
.../java/org/apache/heron/streamlet/impl/ContextImpl.java | 1 -
.../org/apache/heron/streamlet/impl/WindowConfigImpl.java | 4 +---
.../heron/streamlet/impl/operators/FilterOperator.java | 13 -------------
.../heron/streamlet/impl/operators/FlatMapOperator.java | 13 -------------
.../operators/GeneralReduceByKeyAndWindowOperator.java | 10 ----------
.../heron/streamlet/impl/operators/JoinOperator.java | 10 ----------
.../heron/streamlet/impl/operators/MapOperator.java | 13 -------------
.../impl/operators/ReduceByKeyAndWindowOperator.java | 11 +----------
.../heron/streamlet/impl/operators/StreamletOperator.java | 15 ++++++++++++++-
.../streamlet/impl/operators/StreamletWindowOperator.java | 15 ++++++++++++++-
.../heron/streamlet/impl/operators/TransformOperator.java | 5 ++---
.../heron/streamlet/impl/operators/UnionOperator.java | 12 ------------
.../apache/heron/streamlet/impl/sinks/ComplexSink.java | 3 +--
.../apache/heron/streamlet/impl/sinks/ConsumerSink.java | 13 +------------
.../org/apache/heron/streamlet/impl/sinks/LogSink.java | 12 +-----------
15 files changed, 35 insertions(+), 115 deletions(-)
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/ContextImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/ContextImpl.java
index 866a2ba..f1495bd 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/ContextImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/ContextImpl.java
@@ -17,7 +17,6 @@
* under the License.
*/
-
package org.apache.heron.streamlet.impl;
import java.io.Serializable;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/WindowConfigImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/WindowConfigImpl.java
index 6e144e1..09c29f5 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/WindowConfigImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/WindowConfigImpl.java
@@ -17,10 +17,8 @@
* under the License.
*/
-
package org.apache.heron.streamlet.impl;
-
import java.time.Duration;
import org.apache.heron.api.bolt.BaseWindowedBolt;
@@ -42,7 +40,7 @@ public final class WindowConfigImpl implements WindowConfig {
private TriggerPolicy<Tuple, ?> triggerPolicy;
private EvictionPolicy<Tuple, ?> evictionPolicy;
- public WindowConfigImpl(Duration windowDuration, Duration slidingIntervalDuration) {
+ public WindowConfigImpl(Duration windowDuration, Duration slidingIntervalDuration) {
this.windowType = WindowType.TIME;
this.windowDuration = windowDuration;
this.slidingIntervalDuration = slidingIntervalDuration;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FilterOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FilterOperator.java
index af9afe8..24eea58 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FilterOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FilterOperator.java
@@ -17,13 +17,8 @@
* under the License.
*/
-
package org.apache.heron.streamlet.impl.operators;
-import java.util.Map;
-
-import org.apache.heron.api.bolt.OutputCollector;
-import org.apache.heron.api.topology.TopologyContext;
import org.apache.heron.api.tuple.Tuple;
import org.apache.heron.api.tuple.Values;
import org.apache.heron.streamlet.SerializablePredicate;
@@ -38,18 +33,10 @@ public class FilterOperator<R> extends StreamletOperator<R, R> {
private static final long serialVersionUID = -4748646871471052706L;
private SerializablePredicate<? super R> filterFn;
- private OutputCollector collector;
-
public FilterOperator(SerializablePredicate<? super R> filterFn) {
this.filterFn = filterFn;
}
- @SuppressWarnings("rawtypes")
- @Override
- public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
- collector = outputCollector;
- }
-
@SuppressWarnings("unchecked")
@Override
public void execute(Tuple tuple) {
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FlatMapOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FlatMapOperator.java
index 35be0cf..dce70e7 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FlatMapOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FlatMapOperator.java
@@ -17,13 +17,8 @@
* under the License.
*/
-
package org.apache.heron.streamlet.impl.operators;
-import java.util.Map;
-
-import org.apache.heron.api.bolt.OutputCollector;
-import org.apache.heron.api.topology.TopologyContext;
import org.apache.heron.api.tuple.Tuple;
import org.apache.heron.api.tuple.Values;
import org.apache.heron.streamlet.SerializableFunction;
@@ -38,19 +33,11 @@ public class FlatMapOperator<R, T> extends StreamletOperator<R, T> {
private static final long serialVersionUID = -2418329215159618998L;
private SerializableFunction<? super R, ? extends Iterable<? extends T>> flatMapFn;
- private OutputCollector collector;
-
public FlatMapOperator(
SerializableFunction<? super R, ? extends Iterable<? extends T>> flatMapFn) {
this.flatMapFn = flatMapFn;
}
- @SuppressWarnings("rawtypes")
- @Override
- public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
- collector = outputCollector;
- }
-
@SuppressWarnings("unchecked")
@Override
public void execute(Tuple tuple) {
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java
index 7b9f71f..fe6d458 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java
@@ -17,14 +17,11 @@
* under the License.
*/
-
package org.apache.heron.streamlet.impl.operators;
import java.util.HashMap;
import java.util.Map;
-import org.apache.heron.api.bolt.OutputCollector;
-import org.apache.heron.api.topology.TopologyContext;
import org.apache.heron.api.tuple.Tuple;
import org.apache.heron.api.tuple.Values;
import org.apache.heron.api.windowing.TupleWindow;
@@ -45,7 +42,6 @@ public class GeneralReduceByKeyAndWindowOperator<K, V, VR> extends StreamletWind
private SerializableFunction<V, K> keyExtractor;
private VR identity;
private SerializableBiFunction<VR, V, ? extends VR> reduceFn;
- private OutputCollector collector;
public GeneralReduceByKeyAndWindowOperator(SerializableFunction<V, K> keyExtractor, VR identity,
SerializableBiFunction<VR, V, ? extends VR> reduceFn) {
@@ -54,12 +50,6 @@ public class GeneralReduceByKeyAndWindowOperator<K, V, VR> extends StreamletWind
this.reduceFn = reduceFn;
}
- @SuppressWarnings("rawtypes")
- @Override
- public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
- collector = outputCollector;
- }
-
@SuppressWarnings("unchecked")
@Override
public void execute(TupleWindow inputWindow) {
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/JoinOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/JoinOperator.java
index 00c8d80..157b487 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/JoinOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/JoinOperator.java
@@ -17,7 +17,6 @@
* under the License.
*/
-
package org.apache.heron.streamlet.impl.operators;
import java.util.HashMap;
@@ -26,8 +25,6 @@ import java.util.List;
import java.util.Map;
import org.apache.heron.api.Pair;
-import org.apache.heron.api.bolt.OutputCollector;
-import org.apache.heron.api.topology.TopologyContext;
import org.apache.heron.api.tuple.Tuple;
import org.apache.heron.api.tuple.Values;
import org.apache.heron.api.windowing.TupleWindow;
@@ -58,7 +55,6 @@ public class JoinOperator<K, V1, V2, VR> extends StreamletWindowOperator<V1, VR>
private SerializableFunction<V2, K> rightKeyExtractor;
// The user supplied join function
private SerializableBiFunction<V1, V2, ? extends VR> joinFn;
- private OutputCollector collector;
public JoinOperator(JoinType joinType, String leftComponent, String rightComponent,
SerializableFunction<V1, K> leftKeyExtractor,
@@ -72,12 +68,6 @@ public class JoinOperator<K, V1, V2, VR> extends StreamletWindowOperator<V1, VR>
this.joinFn = joinFn;
}
- @SuppressWarnings("rawtypes")
- @Override
- public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
- collector = outputCollector;
- }
-
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> cfg = super.getComponentConfiguration();
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/MapOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/MapOperator.java
index 7ee9be1..73bfbf5 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/MapOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/MapOperator.java
@@ -17,13 +17,8 @@
* under the License.
*/
-
package org.apache.heron.streamlet.impl.operators;
-import java.util.Map;
-
-import org.apache.heron.api.bolt.OutputCollector;
-import org.apache.heron.api.topology.TopologyContext;
import org.apache.heron.api.tuple.Tuple;
import org.apache.heron.api.tuple.Values;
import org.apache.heron.streamlet.SerializableFunction;
@@ -37,18 +32,10 @@ public class MapOperator<R, T> extends StreamletOperator<R, T> {
private static final long serialVersionUID = -1303096133107278700L;
private SerializableFunction<? super R, ? extends T> mapFn;
- private OutputCollector collector;
-
public MapOperator(SerializableFunction<? super R, ? extends T> mapFn) {
this.mapFn = mapFn;
}
- @SuppressWarnings("rawtypes")
- @Override
- public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
- collector = outputCollector;
- }
-
@SuppressWarnings("unchecked")
@Override
public void execute(Tuple tuple) {
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperator.java
index b0155df..92c2d20 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperator.java
@@ -17,14 +17,11 @@
* under the License.
*/
-
package org.apache.heron.streamlet.impl.operators;
import java.util.HashMap;
import java.util.Map;
-import org.apache.heron.api.bolt.OutputCollector;
-import org.apache.heron.api.topology.TopologyContext;
import org.apache.heron.api.tuple.Tuple;
import org.apache.heron.api.tuple.Values;
import org.apache.heron.api.windowing.TupleWindow;
@@ -41,11 +38,11 @@ import org.apache.heron.streamlet.Window;
* function grouped by keys. It emits a KeyedWindow, reduced Value KeyPairs as outputs
*/
public class ReduceByKeyAndWindowOperator<K, V, R> extends StreamletWindowOperator<R, V> {
+
private static final long serialVersionUID = 2833576046687750496L;
private SerializableFunction<R, K> keyExtractor;
private SerializableFunction<R, V> valueExtractor;
private SerializableBinaryOperator<V> reduceFn;
- private OutputCollector collector;
public ReduceByKeyAndWindowOperator(SerializableFunction<R, K> keyExtractor,
SerializableFunction<R, V> valueExtractor,
@@ -55,12 +52,6 @@ public class ReduceByKeyAndWindowOperator<K, V, R> extends StreamletWindowOperat
this.reduceFn = reduceFn;
}
- @SuppressWarnings("rawtypes")
- @Override
- public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
- collector = outputCollector;
- }
-
@SuppressWarnings("unchecked")
@Override
public void execute(TupleWindow inputWindow) {
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
index 75e9b7f..0b08478 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
@@ -17,11 +17,14 @@
* under the License.
*/
-
package org.apache.heron.streamlet.impl.operators;
+import java.util.Map;
+
import org.apache.heron.api.bolt.BaseRichBolt;
+import org.apache.heron.api.bolt.OutputCollector;
import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
import org.apache.heron.api.tuple.Fields;
import org.apache.heron.streamlet.IStreamletRichOperator;
@@ -35,6 +38,16 @@ public abstract class StreamletOperator<R, T>
private static final long serialVersionUID = 8524238140745238942L;
private static final String OUTPUT_FIELD_NAME = "output";
+ protected OutputCollector collector;
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void prepare(Map<String, Object> map,
+ TopologyContext topologyContext,
+ OutputCollector outputCollector) {
+ collector = outputCollector;
+ }
+
/**
* The operators implementing streamlet functionality have some properties.
* 1. They all output only one stream
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletWindowOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletWindowOperator.java
index f90655e..9c9f4f8 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletWindowOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletWindowOperator.java
@@ -17,11 +17,14 @@
* under the License.
*/
-
package org.apache.heron.streamlet.impl.operators;
+import java.util.Map;
+
import org.apache.heron.api.bolt.BaseWindowedBolt;
+import org.apache.heron.api.bolt.OutputCollector;
import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
import org.apache.heron.api.tuple.Fields;
import org.apache.heron.streamlet.IStreamletWindowOperator;
@@ -32,8 +35,18 @@ import org.apache.heron.streamlet.IStreamletWindowOperator;
public abstract class StreamletWindowOperator<R, T>
extends BaseWindowedBolt
implements IStreamletWindowOperator<R, T> {
+
private static final long serialVersionUID = -4836560876041237959L;
private static final String OUTPUT_FIELD_NAME = "output";
+ protected OutputCollector collector;
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void prepare(Map<String, Object> map,
+ TopologyContext topologyContext,
+ OutputCollector outputCollector) {
+ collector = outputCollector;
+ }
/**
* The operators implementing streamlet functionality have some properties.
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java
index 0bf5232..2379a3b 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java
@@ -17,7 +17,6 @@
* under the License.
*/
-
package org.apache.heron.streamlet.impl.operators;
import java.io.Serializable;
@@ -41,10 +40,10 @@ import org.apache.heron.streamlet.impl.ContextImpl;
*/
public class TransformOperator<R, T> extends StreamletOperator<R, T>
implements IStatefulComponent<Serializable, Serializable> {
+
private static final long serialVersionUID = 429297144878185182L;
private SerializableTransformer<? super R, ? extends T> serializableTransformer;
- private OutputCollector collector;
private State<Serializable, Serializable> state;
public TransformOperator(
@@ -71,7 +70,7 @@ public class TransformOperator<R, T> extends StreamletOperator<R, T>
public void prepare(Map<String, Object> map,
TopologyContext topologyContext,
OutputCollector outputCollector) {
- collector = outputCollector;
+ super.prepare(map, topologyContext, outputCollector);
Context context = new ContextImpl(topologyContext, map, state);
serializableTransformer.setup(context);
}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java
index 151c0b4..4d56bf1 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java
@@ -17,13 +17,8 @@
* under the License.
*/
-
package org.apache.heron.streamlet.impl.operators;
-import java.util.Map;
-
-import org.apache.heron.api.bolt.OutputCollector;
-import org.apache.heron.api.topology.TopologyContext;
import org.apache.heron.api.tuple.Tuple;
import org.apache.heron.api.tuple.Values;
@@ -33,17 +28,10 @@ import org.apache.heron.api.tuple.Values;
*/
public class UnionOperator<I> extends StreamletOperator<I, I> {
private static final long serialVersionUID = -7326832064961413315L;
- private OutputCollector collector;
public UnionOperator() {
}
- @SuppressWarnings("rawtypes")
- @Override
- public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
- collector = outputCollector;
- }
-
@SuppressWarnings("unchecked")
@Override
public void execute(Tuple tuple) {
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ComplexSink.java b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ComplexSink.java
index 943e3c5..90b4217 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ComplexSink.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ComplexSink.java
@@ -17,7 +17,6 @@
* under the License.
*/
-
package org.apache.heron.streamlet.impl.sinks;
import java.io.Serializable;
@@ -39,9 +38,9 @@ import org.apache.heron.streamlet.impl.operators.StreamletOperator;
*/
public class ComplexSink<R> extends StreamletOperator<R, R>
implements IStatefulComponent<Serializable, Serializable> {
+
private static final long serialVersionUID = 8717991188885786658L;
private Sink<R> sink;
- private OutputCollector collector;
private State<Serializable, Serializable> state;
public ComplexSink(Sink<R> sink) {
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ConsumerSink.java b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ConsumerSink.java
index af94704..dd5e4a5 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ConsumerSink.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ConsumerSink.java
@@ -17,13 +17,8 @@
* under the License.
*/
-
package org.apache.heron.streamlet.impl.sinks;
-import java.util.Map;
-
-import org.apache.heron.api.bolt.OutputCollector;
-import org.apache.heron.api.topology.TopologyContext;
import org.apache.heron.api.tuple.Tuple;
import org.apache.heron.streamlet.SerializableConsumer;
import org.apache.heron.streamlet.impl.operators.StreamletOperator;
@@ -33,20 +28,14 @@ import org.apache.heron.streamlet.impl.operators.StreamletOperator;
* consume function for every tuple.
*/
public class ConsumerSink<R> extends StreamletOperator<R, R> {
+
private static final long serialVersionUID = 8716140142187667638L;
private SerializableConsumer<R> consumer;
- private OutputCollector collector;
public ConsumerSink(SerializableConsumer<R> consumer) {
this.consumer = consumer;
}
- @SuppressWarnings("rawtypes")
- @Override
- public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
- this.collector = outputCollector;
- }
-
@SuppressWarnings("unchecked")
@Override
public void execute(Tuple tuple) {
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/LogSink.java b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/LogSink.java
index 7fdc3c9..ae1f6d1 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/LogSink.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/LogSink.java
@@ -17,14 +17,10 @@
* under the License.
*/
-
package org.apache.heron.streamlet.impl.sinks;
-import java.util.Map;
import java.util.logging.Logger;
-import org.apache.heron.api.bolt.OutputCollector;
-import org.apache.heron.api.topology.TopologyContext;
import org.apache.heron.api.tuple.Tuple;
import org.apache.heron.streamlet.impl.operators.StreamletOperator;
@@ -33,19 +29,13 @@ import org.apache.heron.streamlet.impl.operators.StreamletOperator;
* It basically logs every tuple.
*/
public class LogSink<R> extends StreamletOperator<R, R> {
+
private static final long serialVersionUID = -6392422646613189818L;
private static final Logger LOG = Logger.getLogger(LogSink.class.getName());
- private OutputCollector collector;
public LogSink() {
}
- @SuppressWarnings("rawtypes")
- @Override
- public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
- this.collector = outputCollector;
- }
-
@SuppressWarnings("unchecked")
@Override
public void execute(Tuple tuple) {