You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nw...@apache.org on 2018/11/19 20:56:13 UTC

[incubator-heron] branch master updated: [Java Streamlet API] Support abstractions on Streamlet Operators (#3112)

This is an automated email from the ASF dual-hosted git repository.

nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f44c11  [Java Streamlet API] Support abstractions on Streamlet Operators (#3112)
8f44c11 is described below

commit 8f44c118b0c5cbe94f4e2ec21fb75a04dc8a7f32
Author: Eren Avsarogullari <er...@gmail.com>
AuthorDate: Mon Nov 19 20:56:08 2018 +0000

    [Java Streamlet API] Support abstractions on Streamlet Operators (#3112)
    
    * [Java Streamlet API] Support Abstractions on Streamlet Operators
---
 .../java/org/apache/heron/streamlet/impl/ContextImpl.java |  1 -
 .../org/apache/heron/streamlet/impl/WindowConfigImpl.java |  4 +---
 .../heron/streamlet/impl/operators/FilterOperator.java    | 13 -------------
 .../heron/streamlet/impl/operators/FlatMapOperator.java   | 13 -------------
 .../operators/GeneralReduceByKeyAndWindowOperator.java    | 10 ----------
 .../heron/streamlet/impl/operators/JoinOperator.java      | 10 ----------
 .../heron/streamlet/impl/operators/MapOperator.java       | 13 -------------
 .../impl/operators/ReduceByKeyAndWindowOperator.java      | 11 +----------
 .../heron/streamlet/impl/operators/StreamletOperator.java | 15 ++++++++++++++-
 .../streamlet/impl/operators/StreamletWindowOperator.java | 15 ++++++++++++++-
 .../heron/streamlet/impl/operators/TransformOperator.java |  5 ++---
 .../heron/streamlet/impl/operators/UnionOperator.java     | 12 ------------
 .../apache/heron/streamlet/impl/sinks/ComplexSink.java    |  3 +--
 .../apache/heron/streamlet/impl/sinks/ConsumerSink.java   | 13 +------------
 .../org/apache/heron/streamlet/impl/sinks/LogSink.java    | 12 +-----------
 15 files changed, 35 insertions(+), 115 deletions(-)

diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/ContextImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/ContextImpl.java
index 866a2ba..f1495bd 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/ContextImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/ContextImpl.java
@@ -17,7 +17,6 @@
  * under the License.
  */
 
-
 package org.apache.heron.streamlet.impl;
 
 import java.io.Serializable;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/WindowConfigImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/WindowConfigImpl.java
index 6e144e1..09c29f5 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/WindowConfigImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/WindowConfigImpl.java
@@ -17,10 +17,8 @@
  * under the License.
  */
 
-
 package org.apache.heron.streamlet.impl;
 
-
 import java.time.Duration;
 
 import org.apache.heron.api.bolt.BaseWindowedBolt;
@@ -42,7 +40,7 @@ public final class WindowConfigImpl implements WindowConfig {
   private TriggerPolicy<Tuple, ?> triggerPolicy;
   private EvictionPolicy<Tuple, ?> evictionPolicy;
 
-  public  WindowConfigImpl(Duration windowDuration, Duration slidingIntervalDuration) {
+  public WindowConfigImpl(Duration windowDuration, Duration slidingIntervalDuration) {
     this.windowType = WindowType.TIME;
     this.windowDuration = windowDuration;
     this.slidingIntervalDuration = slidingIntervalDuration;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FilterOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FilterOperator.java
index af9afe8..24eea58 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FilterOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FilterOperator.java
@@ -17,13 +17,8 @@
  * under the License.
  */
 
-
 package org.apache.heron.streamlet.impl.operators;
 
-import java.util.Map;
-
-import org.apache.heron.api.bolt.OutputCollector;
-import org.apache.heron.api.topology.TopologyContext;
 import org.apache.heron.api.tuple.Tuple;
 import org.apache.heron.api.tuple.Values;
 import org.apache.heron.streamlet.SerializablePredicate;
@@ -38,18 +33,10 @@ public class FilterOperator<R> extends StreamletOperator<R, R> {
   private static final long serialVersionUID = -4748646871471052706L;
   private SerializablePredicate<? super R> filterFn;
 
-  private OutputCollector collector;
-
   public FilterOperator(SerializablePredicate<? super R> filterFn) {
     this.filterFn = filterFn;
   }
 
-  @SuppressWarnings("rawtypes")
-  @Override
-  public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
-    collector = outputCollector;
-  }
-
   @SuppressWarnings("unchecked")
   @Override
   public void execute(Tuple tuple) {
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FlatMapOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FlatMapOperator.java
index 35be0cf..dce70e7 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FlatMapOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FlatMapOperator.java
@@ -17,13 +17,8 @@
  * under the License.
  */
 
-
 package org.apache.heron.streamlet.impl.operators;
 
-import java.util.Map;
-
-import org.apache.heron.api.bolt.OutputCollector;
-import org.apache.heron.api.topology.TopologyContext;
 import org.apache.heron.api.tuple.Tuple;
 import org.apache.heron.api.tuple.Values;
 import org.apache.heron.streamlet.SerializableFunction;
@@ -38,19 +33,11 @@ public class FlatMapOperator<R, T> extends StreamletOperator<R, T> {
   private static final long serialVersionUID = -2418329215159618998L;
   private SerializableFunction<? super R, ? extends Iterable<? extends T>> flatMapFn;
 
-  private OutputCollector collector;
-
   public FlatMapOperator(
       SerializableFunction<? super R, ? extends Iterable<? extends T>> flatMapFn) {
     this.flatMapFn = flatMapFn;
   }
 
-  @SuppressWarnings("rawtypes")
-  @Override
-  public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
-    collector = outputCollector;
-  }
-
   @SuppressWarnings("unchecked")
   @Override
   public void execute(Tuple tuple) {
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java
index 7b9f71f..fe6d458 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java
@@ -17,14 +17,11 @@
  * under the License.
  */
 
-
 package org.apache.heron.streamlet.impl.operators;
 
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.heron.api.bolt.OutputCollector;
-import org.apache.heron.api.topology.TopologyContext;
 import org.apache.heron.api.tuple.Tuple;
 import org.apache.heron.api.tuple.Values;
 import org.apache.heron.api.windowing.TupleWindow;
@@ -45,7 +42,6 @@ public class GeneralReduceByKeyAndWindowOperator<K, V, VR> extends StreamletWind
   private SerializableFunction<V, K> keyExtractor;
   private VR identity;
   private SerializableBiFunction<VR, V, ? extends VR> reduceFn;
-  private OutputCollector collector;
 
   public GeneralReduceByKeyAndWindowOperator(SerializableFunction<V, K> keyExtractor, VR identity,
                             SerializableBiFunction<VR, V, ? extends VR> reduceFn) {
@@ -54,12 +50,6 @@ public class GeneralReduceByKeyAndWindowOperator<K, V, VR> extends StreamletWind
     this.reduceFn = reduceFn;
   }
 
-  @SuppressWarnings("rawtypes")
-  @Override
-  public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
-    collector = outputCollector;
-  }
-
   @SuppressWarnings("unchecked")
   @Override
   public void execute(TupleWindow inputWindow) {
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/JoinOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/JoinOperator.java
index 00c8d80..157b487 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/JoinOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/JoinOperator.java
@@ -17,7 +17,6 @@
  * under the License.
  */
 
-
 package org.apache.heron.streamlet.impl.operators;
 
 import java.util.HashMap;
@@ -26,8 +25,6 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.heron.api.Pair;
-import org.apache.heron.api.bolt.OutputCollector;
-import org.apache.heron.api.topology.TopologyContext;
 import org.apache.heron.api.tuple.Tuple;
 import org.apache.heron.api.tuple.Values;
 import org.apache.heron.api.windowing.TupleWindow;
@@ -58,7 +55,6 @@ public class JoinOperator<K, V1, V2, VR> extends StreamletWindowOperator<V1, VR>
   private SerializableFunction<V2, K> rightKeyExtractor;
   // The user supplied join function
   private SerializableBiFunction<V1, V2, ? extends VR> joinFn;
-  private OutputCollector collector;
 
   public JoinOperator(JoinType joinType, String leftComponent, String rightComponent,
                       SerializableFunction<V1, K> leftKeyExtractor,
@@ -72,12 +68,6 @@ public class JoinOperator<K, V1, V2, VR> extends StreamletWindowOperator<V1, VR>
     this.joinFn = joinFn;
   }
 
-  @SuppressWarnings("rawtypes")
-  @Override
-  public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
-    collector = outputCollector;
-  }
-
   @Override
   public Map<String, Object> getComponentConfiguration() {
     Map<String, Object> cfg = super.getComponentConfiguration();
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/MapOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/MapOperator.java
index 7ee9be1..73bfbf5 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/MapOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/MapOperator.java
@@ -17,13 +17,8 @@
  * under the License.
  */
 
-
 package org.apache.heron.streamlet.impl.operators;
 
-import java.util.Map;
-
-import org.apache.heron.api.bolt.OutputCollector;
-import org.apache.heron.api.topology.TopologyContext;
 import org.apache.heron.api.tuple.Tuple;
 import org.apache.heron.api.tuple.Values;
 import org.apache.heron.streamlet.SerializableFunction;
@@ -37,18 +32,10 @@ public class MapOperator<R, T> extends StreamletOperator<R, T> {
   private static final long serialVersionUID = -1303096133107278700L;
   private SerializableFunction<? super R, ? extends T> mapFn;
 
-  private OutputCollector collector;
-
   public MapOperator(SerializableFunction<? super R, ? extends T> mapFn) {
     this.mapFn = mapFn;
   }
 
-  @SuppressWarnings("rawtypes")
-  @Override
-  public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
-    collector = outputCollector;
-  }
-
   @SuppressWarnings("unchecked")
   @Override
   public void execute(Tuple tuple) {
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperator.java
index b0155df..92c2d20 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperator.java
@@ -17,14 +17,11 @@
  * under the License.
  */
 
-
 package org.apache.heron.streamlet.impl.operators;
 
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.heron.api.bolt.OutputCollector;
-import org.apache.heron.api.topology.TopologyContext;
 import org.apache.heron.api.tuple.Tuple;
 import org.apache.heron.api.tuple.Values;
 import org.apache.heron.api.windowing.TupleWindow;
@@ -41,11 +38,11 @@ import org.apache.heron.streamlet.Window;
  * function grouped by keys. It emits a KeyedWindow, reduced Value KeyPairs as outputs
  */
 public class ReduceByKeyAndWindowOperator<K, V, R> extends StreamletWindowOperator<R, V> {
+
   private static final long serialVersionUID = 2833576046687750496L;
   private SerializableFunction<R, K> keyExtractor;
   private SerializableFunction<R, V> valueExtractor;
   private SerializableBinaryOperator<V> reduceFn;
-  private OutputCollector collector;
 
   public ReduceByKeyAndWindowOperator(SerializableFunction<R, K> keyExtractor,
                                       SerializableFunction<R, V> valueExtractor,
@@ -55,12 +52,6 @@ public class ReduceByKeyAndWindowOperator<K, V, R> extends StreamletWindowOperat
     this.reduceFn = reduceFn;
   }
 
-  @SuppressWarnings("rawtypes")
-  @Override
-  public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
-    collector = outputCollector;
-  }
-
   @SuppressWarnings("unchecked")
   @Override
   public void execute(TupleWindow inputWindow) {
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
index 75e9b7f..0b08478 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
@@ -17,11 +17,14 @@
  * under the License.
  */
 
-
 package org.apache.heron.streamlet.impl.operators;
 
+import java.util.Map;
+
 import org.apache.heron.api.bolt.BaseRichBolt;
+import org.apache.heron.api.bolt.OutputCollector;
 import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
 import org.apache.heron.api.tuple.Fields;
 import org.apache.heron.streamlet.IStreamletRichOperator;
 
@@ -35,6 +38,16 @@ public abstract class StreamletOperator<R, T>
   private static final long serialVersionUID = 8524238140745238942L;
   private static final String OUTPUT_FIELD_NAME = "output";
 
+  protected OutputCollector collector;
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public void prepare(Map<String, Object> map,
+                      TopologyContext topologyContext,
+                      OutputCollector outputCollector) {
+    collector = outputCollector;
+  }
+
   /**
    * The operators implementing streamlet functionality have some properties.
    * 1. They all output only one stream
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletWindowOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletWindowOperator.java
index f90655e..9c9f4f8 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletWindowOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletWindowOperator.java
@@ -17,11 +17,14 @@
  * under the License.
  */
 
-
 package org.apache.heron.streamlet.impl.operators;
 
+import java.util.Map;
+
 import org.apache.heron.api.bolt.BaseWindowedBolt;
+import org.apache.heron.api.bolt.OutputCollector;
 import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
 import org.apache.heron.api.tuple.Fields;
 import org.apache.heron.streamlet.IStreamletWindowOperator;
 
@@ -32,8 +35,18 @@ import org.apache.heron.streamlet.IStreamletWindowOperator;
 public abstract class StreamletWindowOperator<R, T>
     extends BaseWindowedBolt
     implements IStreamletWindowOperator<R, T> {
+
   private static final long serialVersionUID = -4836560876041237959L;
   private static final String OUTPUT_FIELD_NAME = "output";
+  protected OutputCollector collector;
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public void prepare(Map<String, Object> map,
+                      TopologyContext topologyContext,
+                      OutputCollector outputCollector) {
+    collector = outputCollector;
+  }
 
   /**
    * The operators implementing streamlet functionality have some properties.
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java
index 0bf5232..2379a3b 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java
@@ -17,7 +17,6 @@
  * under the License.
  */
 
-
 package org.apache.heron.streamlet.impl.operators;
 
 import java.io.Serializable;
@@ -41,10 +40,10 @@ import org.apache.heron.streamlet.impl.ContextImpl;
  */
 public class TransformOperator<R, T> extends StreamletOperator<R, T>
     implements IStatefulComponent<Serializable, Serializable> {
+
   private static final long serialVersionUID = 429297144878185182L;
   private SerializableTransformer<? super R, ? extends T> serializableTransformer;
 
-  private OutputCollector collector;
   private State<Serializable, Serializable> state;
 
   public TransformOperator(
@@ -71,7 +70,7 @@ public class TransformOperator<R, T> extends StreamletOperator<R, T>
   public void prepare(Map<String, Object> map,
                       TopologyContext topologyContext,
                       OutputCollector outputCollector) {
-    collector = outputCollector;
+    super.prepare(map, topologyContext, outputCollector);
     Context context = new ContextImpl(topologyContext, map, state);
     serializableTransformer.setup(context);
   }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java
index 151c0b4..4d56bf1 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java
@@ -17,13 +17,8 @@
  * under the License.
  */
 
-
 package org.apache.heron.streamlet.impl.operators;
 
-import java.util.Map;
-
-import org.apache.heron.api.bolt.OutputCollector;
-import org.apache.heron.api.topology.TopologyContext;
 import org.apache.heron.api.tuple.Tuple;
 import org.apache.heron.api.tuple.Values;
 
@@ -33,17 +28,10 @@ import org.apache.heron.api.tuple.Values;
  */
 public class UnionOperator<I> extends StreamletOperator<I, I> {
   private static final long serialVersionUID = -7326832064961413315L;
-  private OutputCollector collector;
 
   public UnionOperator() {
   }
 
-  @SuppressWarnings("rawtypes")
-  @Override
-  public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
-    collector = outputCollector;
-  }
-
   @SuppressWarnings("unchecked")
   @Override
   public void execute(Tuple tuple) {
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ComplexSink.java b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ComplexSink.java
index 943e3c5..90b4217 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ComplexSink.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ComplexSink.java
@@ -17,7 +17,6 @@
  * under the License.
  */
 
-
 package org.apache.heron.streamlet.impl.sinks;
 
 import java.io.Serializable;
@@ -39,9 +38,9 @@ import org.apache.heron.streamlet.impl.operators.StreamletOperator;
  */
 public class ComplexSink<R> extends StreamletOperator<R, R>
     implements IStatefulComponent<Serializable, Serializable> {
+
   private static final long serialVersionUID = 8717991188885786658L;
   private Sink<R> sink;
-  private OutputCollector collector;
   private State<Serializable, Serializable> state;
 
   public ComplexSink(Sink<R> sink) {
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ConsumerSink.java b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ConsumerSink.java
index af94704..dd5e4a5 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ConsumerSink.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ConsumerSink.java
@@ -17,13 +17,8 @@
  * under the License.
  */
 
-
 package org.apache.heron.streamlet.impl.sinks;
 
-import java.util.Map;
-
-import org.apache.heron.api.bolt.OutputCollector;
-import org.apache.heron.api.topology.TopologyContext;
 import org.apache.heron.api.tuple.Tuple;
 import org.apache.heron.streamlet.SerializableConsumer;
 import org.apache.heron.streamlet.impl.operators.StreamletOperator;
@@ -33,20 +28,14 @@ import org.apache.heron.streamlet.impl.operators.StreamletOperator;
  * consume function for every tuple.
  */
 public class ConsumerSink<R> extends StreamletOperator<R, R> {
+
   private static final long serialVersionUID = 8716140142187667638L;
   private SerializableConsumer<R> consumer;
-  private OutputCollector collector;
 
   public ConsumerSink(SerializableConsumer<R> consumer) {
     this.consumer = consumer;
   }
 
-  @SuppressWarnings("rawtypes")
-  @Override
-  public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
-    this.collector = outputCollector;
-  }
-
   @SuppressWarnings("unchecked")
   @Override
   public void execute(Tuple tuple) {
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/LogSink.java b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/LogSink.java
index 7fdc3c9..ae1f6d1 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/LogSink.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/LogSink.java
@@ -17,14 +17,10 @@
  * under the License.
  */
 
-
 package org.apache.heron.streamlet.impl.sinks;
 
-import java.util.Map;
 import java.util.logging.Logger;
 
-import org.apache.heron.api.bolt.OutputCollector;
-import org.apache.heron.api.topology.TopologyContext;
 import org.apache.heron.api.tuple.Tuple;
 import org.apache.heron.streamlet.impl.operators.StreamletOperator;
 
@@ -33,19 +29,13 @@ import org.apache.heron.streamlet.impl.operators.StreamletOperator;
  * It basically logs every tuple.
  */
 public class LogSink<R> extends StreamletOperator<R, R> {
+
   private static final long serialVersionUID = -6392422646613189818L;
   private static final Logger LOG = Logger.getLogger(LogSink.class.getName());
-  private OutputCollector collector;
 
   public LogSink() {
   }
 
-  @SuppressWarnings("rawtypes")
-  @Override
-  public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
-    this.collector = outputCollector;
-  }
-
   @SuppressWarnings("unchecked")
   @Override
   public void execute(Tuple tuple) {