You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/04/27 22:55:17 UTC

samza git commit: SAMZA-1219; Add metrics for operator message received and execution times

Repository: samza
Updated Branches:
  refs/heads/master f7e173651 -> 92ae4c628


SAMZA-1219; Add metrics for operator message received and execution times

Author: Prateek Maheshwari <pm...@linkedin.com>

Reviewers: Jagadish <ja...@apache.org>

Closes #142 from prateekm/operator-metrics


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/92ae4c62
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/92ae4c62
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/92ae4c62

Branch: refs/heads/master
Commit: 92ae4c628abb3d113520ec47ca82f08c480123ad
Parents: f7e1736
Author: Prateek Maheshwari <pm...@linkedin.com>
Authored: Thu Apr 27 15:55:11 2017 -0700
Committer: vjagadish1989 <jv...@linkedin.com>
Committed: Thu Apr 27 15:55:11 2017 -0700

----------------------------------------------------------------------
 .../samza/operators/impl/OperatorImpl.java      | 136 +++++++++--
 .../samza/operators/impl/OperatorImplGraph.java |  34 ++-
 .../operators/impl/PartialJoinOperatorImpl.java |  48 ++--
 .../samza/operators/impl/RootOperatorImpl.java  |  36 ++-
 .../impl/SessionWindowOperatorImpl.java         |  52 -----
 .../samza/operators/impl/SinkOperatorImpl.java  |  25 +-
 .../operators/impl/StreamOperatorImpl.java      |  27 ++-
 .../operators/impl/WindowOperatorImpl.java      |  93 +++++---
 .../samza/operators/spec/OperatorSpec.java      |  14 +-
 .../operators/spec/PartialJoinOperatorSpec.java |   8 -
 .../samza/operators/spec/SinkOperatorSpec.java  |   7 -
 .../operators/spec/StreamOperatorSpec.java      |   7 -
 .../operators/spec/WindowOperatorSpec.java      |   9 -
 .../apache/samza/task/StreamOperatorTask.java   |   8 +-
 .../samza/operators/TestJoinOperator.java       |  12 +-
 .../samza/operators/TestWindowOperator.java     |   6 +-
 .../samza/operators/impl/TestOperatorImpl.java  | 226 +++++++++++++++----
 .../samza/operators/impl/TestOperatorImpls.java |  29 +--
 .../operators/impl/TestSinkOperatorImpl.java    |   7 +-
 .../operators/impl/TestStreamOperatorImpl.java  |  22 +-
 20 files changed, 547 insertions(+), 259 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index b9a606b..d547869 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -18,9 +18,19 @@
  */
 package org.apache.samza.operators.impl;
 
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.util.HighResolutionClock;
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -29,60 +39,140 @@ import java.util.Set;
  * Abstract base class for all stream operator implementations.
  */
 public abstract class OperatorImpl<M, RM> {
+  private static final String METRICS_GROUP = OperatorImpl.class.getName();
 
-  private final Set<OperatorImpl<RM, ?>> nextOperators = new HashSet<>();
+  private boolean initialized;
+  private Set<OperatorImpl<RM, ?>> registeredOperators;
+  private HighResolutionClock highResClock;
+  private Counter numMessage;
+  private Timer handleMessageNs;
+  private Timer handleTimerNs;
 
   /**
-   * Register the next operator in the chain that this operator should propagate its output to.
-   * @param nextOperator  the next operator in the chain.
+   * Initialize this {@link OperatorImpl} and its user-defined functions.
+   *
+   * @param config  the {@link Config} for the task
+   * @param context  the {@link TaskContext} for the task
+   */
+  public final void init(Config config, TaskContext context) {
+    String opName = getOperatorSpec().getOpName();
+
+    if (initialized) {
+      throw new IllegalStateException(String.format("Attempted to initialize Operator %s more than once.", opName));
+    }
+
+    this.highResClock = createHighResClock(config);
+    registeredOperators = new HashSet<>();
+    MetricsRegistry metricsRegistry = context.getMetricsRegistry();
+    this.numMessage = metricsRegistry.newCounter(METRICS_GROUP, opName + "-messages");
+    this.handleMessageNs = metricsRegistry.newTimer(METRICS_GROUP, opName + "-handle-message-ns");
+    this.handleTimerNs = metricsRegistry.newTimer(METRICS_GROUP, opName + "-handle-timer-ns");
+
+    handleInit(config, context);
+
+    initialized = true;
+  }
+
+  /**
+   * Initialize this {@link OperatorImpl} and its user-defined functions.
+   *
+   * @param config  the {@link Config} for the task
+   * @param context  the {@link TaskContext} for the task
+   */
+  protected abstract void handleInit(Config config, TaskContext context);
+
+  /**
+   * Register an operator that this operator should propagate its results to.
+   *
+   * @param nextOperator  the next operator to propagate results to
    */
   void registerNextOperator(OperatorImpl<RM, ?> nextOperator) {
-    nextOperators.add(nextOperator);
+    if (!initialized) {
+      throw new IllegalStateException(
+          String.format("Attempted to register next operator before initializing operator %s.",
+              getOperatorSpec().getOpName()));
+    }
+    this.registeredOperators.add(nextOperator);
   }
 
   /**
-   * Perform the transformation required for this operator and call the downstream operators.
+   * Handle the incoming {@code message} for this {@link OperatorImpl} and propagate results to registered operators.
+   * <p>
+   * Delegates to {@link #handleMessage(Object, MessageCollector, TaskCoordinator)} for handling the message.
    *
-   * Must call {@link #propagateResult} to propagate the output to registered downstream operators correctly.
+   * @param message  the input message
+   * @param collector  the {@link MessageCollector} for this message
+   * @param coordinator  the {@link TaskCoordinator} for this message
+   */
+  public final void onMessage(M message, MessageCollector collector, TaskCoordinator coordinator) {
+    this.numMessage.inc();
+    long startNs = this.highResClock.nanoTime();
+    Collection<RM> results = handleMessage(message, collector, coordinator);
+    long endNs = this.highResClock.nanoTime();
+    this.handleMessageNs.update(endNs - startNs);
+
+    results.forEach(rm ->
+        this.registeredOperators.forEach(op ->
+            op.onMessage(rm, collector, coordinator)));
+  }
+
+  /**
+   * Handle the incoming {@code message} and return the results to be propagated to registered operators.
    *
    * @param message  the input message
    * @param collector  the {@link MessageCollector} in the context
    * @param coordinator  the {@link TaskCoordinator} in the context
+   * @return  results of the transformation
    */
-  public abstract void onNext(M message, MessageCollector collector, TaskCoordinator coordinator);
+  protected abstract Collection<RM> handleMessage(M message, MessageCollector collector,
+      TaskCoordinator coordinator);
 
   /**
-   * Invoked at every tick. This method delegates to {@link #onTimer(MessageCollector, TaskCoordinator)}
+   * Handle timer ticks for this {@link OperatorImpl} and propagate the results and timer tick to registered operators.
+   * <p>
+   * Delegates to {@link #handleTimer(MessageCollector, TaskCoordinator)} for handling the timer tick.
    *
    * @param collector  the {@link MessageCollector} in the context
    * @param coordinator  the {@link TaskCoordinator} in the context
    */
-  public final void onTick(MessageCollector collector, TaskCoordinator coordinator) {
-    onTimer(collector, coordinator);
-    nextOperators.forEach(sub -> sub.onTick(collector, coordinator));
+  public final void onTimer(MessageCollector collector, TaskCoordinator coordinator) {
+    long startNs = this.highResClock.nanoTime();
+    Collection<RM> results = handleTimer(collector, coordinator);
+    long endNs = this.highResClock.nanoTime();
+    this.handleTimerNs.update(endNs - startNs);
+
+    results.forEach(rm ->
+        this.registeredOperators.forEach(op ->
+            op.onMessage(rm, collector, coordinator)));
+    this.registeredOperators.forEach(op ->
+        op.onTimer(collector, coordinator));
   }
 
   /**
-   * Invoked at every tick. Implementations must call {@link #propagateResult} to propagate any generated output
-   * to registered downstream operators.
+   * Handle the the timer tick for this operator and return the results to be propagated to registered operators.
+   * <p>
+   * Defaults to a no-op implementation.
    *
    * @param collector  the {@link MessageCollector} in the context
    * @param coordinator  the {@link TaskCoordinator} in the context
+   * @return  results of the timed operation
    */
-  public void onTimer(MessageCollector collector, TaskCoordinator coordinator) {
+  protected Collection<RM> handleTimer(MessageCollector collector, TaskCoordinator coordinator) {
+    return Collections.emptyList();
   }
 
   /**
-   * Helper method to propagate the output of this operator to all registered downstream operators.
-   *
-   * This method <b>must</b> be called from {@link #onNext} and {@link #onTimer}
-   * to propagate the operator output correctly.
+   * Get the {@link OperatorSpec} for this {@link OperatorImpl}.
    *
-   * @param outputMessage  output message
-   * @param collector  the {@link MessageCollector} in the context
-   * @param coordinator  the {@link TaskCoordinator} in the context
+   * @return the {@link OperatorSpec} for this {@link OperatorImpl}
    */
-  void propagateResult(RM outputMessage, MessageCollector collector, TaskCoordinator coordinator) {
-    nextOperators.forEach(sub -> sub.onNext(outputMessage, collector, coordinator));
+  protected abstract OperatorSpec<RM> getOperatorSpec();
+
+  private HighResolutionClock createHighResClock(Config config) {
+    if (new MetricsConfig(config).getMetricsTimerEnabled()) {
+      return System::nanoTime;
+    } else {
+      return () -> 0;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
index 8e492dc..d8ea592 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
@@ -104,20 +104,22 @@ public class OperatorImplGraph {
    * creates the corresponding DAG of {@link OperatorImpl}s, and returns its root {@link RootOperatorImpl} node.
    *
    * @param source  the input {@link MessageStreamImpl} to instantiate {@link OperatorImpl}s for
-   * @param <M>  the type of messagess in the {@code source} {@link MessageStreamImpl}
    * @param config  the {@link Config} required to instantiate operators
    * @param context  the {@link TaskContext} required to instantiate operators
+   * @param <M>  the type of messages in the {@code source} {@link MessageStreamImpl}
    * @return  root node for the {@link OperatorImpl} DAG
    */
-  private <M> RootOperatorImpl<M> createOperatorImpls(MessageStreamImpl<M> source, Config config, TaskContext context) {
+  private <M> RootOperatorImpl<M> createOperatorImpls(MessageStreamImpl<M> source,
+      Config config, TaskContext context) {
     // since the source message stream might have multiple operator specs registered on it,
     // create a new root node as a single point of entry for the DAG.
     RootOperatorImpl<M> rootOperator = new RootOperatorImpl<>();
+    rootOperator.init(config, context);
     // create the pipeline/topology starting from the source
     source.getRegisteredOperatorSpecs().forEach(registeredOperator -> {
-        // pass in the source and context s.t. stateful stream operators can initialize their stores
+        // pass in the context so that operator implementations can initialize their functions
         OperatorImpl<M, ?> operatorImpl =
-            createAndRegisterOperatorImpl(registeredOperator, source, config, context);
+            createAndRegisterOperatorImpl(registeredOperator, config, context);
         rootOperator.registerNextOperator(operatorImpl);
       });
     return rootOperator;
@@ -127,27 +129,26 @@ public class OperatorImplGraph {
    * Helper method to recursively traverse the {@link OperatorSpec} DAG and instantiate and link the corresponding
    * {@link OperatorImpl}s.
    *
-   * @param operatorSpec  the operatorSpec registered with the {@code source}
-   * @param source  the source {@link MessageStreamImpl}
-   * @param <M>  type of input message
+   * @param operatorSpec  the operatorSpec to create the {@link OperatorImpl} for
    * @param config  the {@link Config} required to instantiate operators
    * @param context  the {@link TaskContext} required to instantiate operators
+   * @param <M>  type of input message
    * @return  the operator implementation for the operatorSpec
    */
   private <M> OperatorImpl<M, ?> createAndRegisterOperatorImpl(OperatorSpec operatorSpec,
-      MessageStreamImpl<M> source, Config config, TaskContext context) {
+      Config config, TaskContext context) {
     if (!operatorImpls.containsKey(operatorSpec)) {
-      OperatorImpl<M, ?> operatorImpl = createOperatorImpl(source, operatorSpec, config, context);
+      OperatorImpl<M, ?> operatorImpl = createOperatorImpl(operatorSpec, config, context);
       if (operatorImpls.putIfAbsent(operatorSpec, operatorImpl) == null) {
         // this is the first time we've added the operatorImpl corresponding to the operatorSpec,
         // so traverse and initialize and register the rest of the DAG.
         // initialize the corresponding operator function
-        operatorSpec.init(config, context);
+        operatorImpl.init(config, context);
         MessageStreamImpl nextStream = operatorSpec.getNextStream();
         if (nextStream != null) {
           Collection<OperatorSpec> registeredSpecs = nextStream.getRegisteredOperatorSpecs();
           registeredSpecs.forEach(registeredSpec -> {
-              OperatorImpl subImpl = createAndRegisterOperatorImpl(registeredSpec, nextStream, config, context);
+              OperatorImpl subImpl = createAndRegisterOperatorImpl(registeredSpec, config, context);
               operatorImpl.registerNextOperator(subImpl);
             });
         }
@@ -163,24 +164,21 @@ public class OperatorImplGraph {
   /**
    * Creates a new {@link OperatorImpl} instance for the provided {@link OperatorSpec}.
    *
-   * @param source  the source {@link MessageStreamImpl}
-   * @param <M>  type of input message
    * @param operatorSpec  the immutable {@link OperatorSpec} definition.
    * @param config  the {@link Config} required to instantiate operators
    * @param context  the {@link TaskContext} required to instantiate operators
+   * @param <M>  type of input message
    * @return  the {@link OperatorImpl} implementation instance
    */
-  private <M> OperatorImpl<M, ?> createOperatorImpl(MessageStreamImpl<M> source,
-      OperatorSpec operatorSpec, Config config, TaskContext context) {
+  private <M> OperatorImpl<M, ?> createOperatorImpl(OperatorSpec operatorSpec, Config config, TaskContext context) {
     if (operatorSpec instanceof StreamOperatorSpec) {
-      StreamOperatorSpec<M, ?> streamOpSpec = (StreamOperatorSpec<M, ?>) operatorSpec;
-      return new StreamOperatorImpl<>(streamOpSpec, source, config, context);
+      return new StreamOperatorImpl<>((StreamOperatorSpec<M, ?>) operatorSpec, config, context);
     } else if (operatorSpec instanceof SinkOperatorSpec) {
       return new SinkOperatorImpl<>((SinkOperatorSpec<M>) operatorSpec, config, context);
     } else if (operatorSpec instanceof WindowOperatorSpec) {
       return new WindowOperatorImpl((WindowOperatorSpec<M, ?, ?>) operatorSpec, clock);
     } else if (operatorSpec instanceof PartialJoinOperatorSpec) {
-      return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) operatorSpec, source, config, context, clock);
+      return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) operatorSpec, config, context, clock);
     }
     throw new IllegalArgumentException(
         String.format("Unsupported OperatorSpec: %s", operatorSpec.getClass().getName()));

http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
index e4cb9c2..c7bdc22 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
@@ -18,12 +18,11 @@
  */
 package org.apache.samza.operators.impl;
 
-import java.util.ArrayList;
-import java.util.List;
 import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.metrics.Counter;
 import org.apache.samza.operators.functions.PartialJoinFunction;
 import org.apache.samza.operators.functions.PartialJoinFunction.PartialJoinMessage;
+import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueIterator;
@@ -35,6 +34,11 @@ import org.apache.samza.util.Clock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
 /**
  * Implementation of a {@link PartialJoinOperatorSpec} that joins messages of type {@code M} in this stream
  * with buffered messages of type {@code JM} in the other stream.
@@ -47,35 +51,45 @@ class PartialJoinOperatorImpl<K, M, JM, RM> extends OperatorImpl<M, RM> {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(PartialJoinOperatorImpl.class);
 
+  private final PartialJoinOperatorSpec<K, M, JM, RM> partialJoinOpSpec;
   private final PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn;
   private final PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn;
   private final long ttlMs;
-  private final int opId;
   private final Clock clock;
 
-  PartialJoinOperatorImpl(PartialJoinOperatorSpec<K, M, JM, RM> partialJoinOperatorSpec, MessageStreamImpl<M> source,
+  private Counter keysRemoved;
+
+  PartialJoinOperatorImpl(PartialJoinOperatorSpec<K, M, JM, RM> partialJoinOpSpec,
       Config config, TaskContext context, Clock clock) {
-    this.thisPartialJoinFn = partialJoinOperatorSpec.getThisPartialJoinFn();
-    this.otherPartialJoinFn = partialJoinOperatorSpec.getOtherPartialJoinFn();
-    this.ttlMs = partialJoinOperatorSpec.getTtlMs();
-    this.opId = partialJoinOperatorSpec.getOpId();
+    this.partialJoinOpSpec = partialJoinOpSpec;
+    this.thisPartialJoinFn = partialJoinOpSpec.getThisPartialJoinFn();
+    this.otherPartialJoinFn = partialJoinOpSpec.getOtherPartialJoinFn();
+    this.ttlMs = partialJoinOpSpec.getTtlMs();
     this.clock = clock;
   }
 
   @Override
-  public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
+  protected void handleInit(Config config, TaskContext context) {
+    keysRemoved = context.getMetricsRegistry()
+        .newCounter(OperatorImpl.class.getName(), this.partialJoinOpSpec.getOpName() + "-keys-removed");
+    this.thisPartialJoinFn.init(config, context);
+  }
+
+  @Override
+  public Collection<RM> handleMessage(M message, MessageCollector collector, TaskCoordinator coordinator) {
     K key = thisPartialJoinFn.getKey(message);
     thisPartialJoinFn.getState().put(key, new PartialJoinMessage<>(message, clock.currentTimeMillis()));
     PartialJoinMessage<JM> otherMessage = otherPartialJoinFn.getState().get(key);
     long now = clock.currentTimeMillis();
     if (otherMessage != null && otherMessage.getReceivedTimeMs() > now - ttlMs) {
       RM joinResult = thisPartialJoinFn.apply(message, otherMessage.getMessage());
-      this.propagateResult(joinResult, collector, coordinator);
+      return Collections.singletonList(joinResult);
     }
+    return Collections.emptyList();
   }
 
   @Override
-  public void onTimer(MessageCollector collector, TaskCoordinator coordinator) {
+  public Collection<RM> handleTimer(MessageCollector collector, TaskCoordinator coordinator) {
     long now = clock.currentTimeMillis();
 
     KeyValueStore<K, PartialJoinMessage<M>> thisState = thisPartialJoinFn.getState();
@@ -87,14 +101,18 @@ class PartialJoinOperatorImpl<K, M, JM, RM> extends OperatorImpl<M, RM> {
       if (entry.getValue().getReceivedTimeMs() < now - ttlMs) {
         keysToRemove.add(entry.getKey());
       } else {
-        break;
+        break; // InternalInMemoryStore uses a LinkedHashMap and will return entries in insertion order
       }
     }
 
     iterator.close();
     thisState.deleteAll(keysToRemove);
-
-    LOGGER.info("Operator ID {} onTimer self time: {} ms", opId, clock.currentTimeMillis() - now);
+    keysRemoved.inc(keysToRemove.size());
+    return Collections.emptyList();
   }
 
+  @Override
+  protected OperatorSpec<RM> getOperatorSpec() {
+    return partialJoinOpSpec;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
index eb9b5e2..0f18e97 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
@@ -18,9 +18,16 @@
  */
 package org.apache.samza.operators.impl;
 
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 
+import java.util.Collection;
+import java.util.Collections;
+
 
 /**
  * A no-op operator implementation that forwards incoming messages to all of its subscribers.
@@ -29,7 +36,32 @@ import org.apache.samza.task.TaskCoordinator;
 public final class RootOperatorImpl<M> extends OperatorImpl<M, M> {
 
   @Override
-  public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
-    this.propagateResult(message, collector, coordinator);
+  protected void handleInit(Config config, TaskContext context) {
+  }
+
+  @Override
+  public Collection<M> handleMessage(M message, MessageCollector collector, TaskCoordinator coordinator) {
+    return Collections.singletonList(message);
+  }
+
+  // TODO: SAMZA-1221 - Change to InputOperatorSpec that also builds the message
+  @Override
+  protected OperatorSpec<M> getOperatorSpec() {
+    return new OperatorSpec<M>() {
+      @Override
+      public MessageStreamImpl<M> getNextStream() {
+        return null;
+      }
+
+      @Override
+      public OpCode getOpCode() {
+        return OpCode.INPUT;
+      }
+
+      @Override
+      public int getOpId() {
+        return -1;
+      }
+    };
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java
deleted file mode 100644
index 2bb362c..0000000
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.spec.WindowOperatorSpec;
-import org.apache.samza.operators.windows.WindowPane;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-
-
-/**
- * Default implementation class of a {@link WindowOperatorSpec} for a session window.
- *
- * @param <M>  the type of input message
- * @param <RK>  the type of window key
- * @param <WV>  the type of window state
- */
-class SessionWindowOperatorImpl<M, RK, WV> extends OperatorImpl<M, WindowPane<RK, WV>> {
-
-  private final WindowOperatorSpec<M, RK, WV> windowSpec;
-
-  SessionWindowOperatorImpl(WindowOperatorSpec<M, RK, WV> windowSpec, MessageStreamImpl<M> source, Config config, TaskContext context) {
-    this.windowSpec = windowSpec;
-  }
-
-  @Override
-  public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
-  }
-
-  public void onTimer(MessageCollector collector, TaskCoordinator coordinator) {
-    // This is to periodically check the timeout triggers to get the list of window states to be updated
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
index f92fbfb..e82737f 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
@@ -20,27 +20,44 @@ package org.apache.samza.operators.impl;
 
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.SinkOperatorSpec;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 
+import java.util.Collection;
+import java.util.Collections;
+
 
 /**
  * Implementation for {@link SinkOperatorSpec}
  */
 class SinkOperatorImpl<M> extends OperatorImpl<M, M> {
 
+  private final SinkOperatorSpec<M> sinkOpSpec;
   private final SinkFunction<M> sinkFn;
 
-  SinkOperatorImpl(SinkOperatorSpec<M> sinkOp, Config config, TaskContext context) {
-    this.sinkFn = sinkOp.getSinkFn();
+  SinkOperatorImpl(SinkOperatorSpec<M> sinkOpSpec, Config config, TaskContext context) {
+    this.sinkOpSpec = sinkOpSpec;
+    this.sinkFn = sinkOpSpec.getSinkFn();
+  }
+
+  @Override
+  protected void handleInit(Config config, TaskContext context) {
+    this.sinkFn.init(config, context);
   }
 
   @Override
-  public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
+  public Collection<M> handleMessage(M message, MessageCollector collector,
+      TaskCoordinator coordinator) {
     this.sinkFn.apply(message, collector, coordinator);
     // there should be no further chained operators since this is a terminal operator.
-    // hence we don't call #propogateResult() here.
+    return Collections.emptyList();
+  }
+
+  @Override
+  protected OperatorSpec<M> getOperatorSpec() {
+    return sinkOpSpec;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
index 644de20..bd4dce1 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
@@ -19,13 +19,15 @@
 package org.apache.samza.operators.impl;
 
 import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 
+import java.util.Collection;
+
 
 /**
  * A StreamOperator that accepts a 1:n transform function and applies it to each incoming message.
@@ -35,15 +37,28 @@ import org.apache.samza.task.TaskCoordinator;
  */
 class StreamOperatorImpl<M, RM> extends OperatorImpl<M, RM> {
 
+  private final StreamOperatorSpec<M, RM> streamOpSpec;
   private final FlatMapFunction<M, RM> transformFn;
 
-  StreamOperatorImpl(StreamOperatorSpec<M, RM> streamOperatorSpec, MessageStreamImpl<M> source, Config config, TaskContext context) {
-    this.transformFn = streamOperatorSpec.getTransformFn();
+  StreamOperatorImpl(StreamOperatorSpec<M, RM> streamOpSpec,
+      Config config, TaskContext context) {
+    this.streamOpSpec = streamOpSpec;
+    this.transformFn = streamOpSpec.getTransformFn();
+  }
+
+  @Override
+  protected void handleInit(Config config, TaskContext context) {
+    transformFn.init(config, context);
+  }
+
+  @Override
+  public Collection<RM> handleMessage(M message, MessageCollector collector,
+      TaskCoordinator coordinator) {
+    return this.transformFn.apply(message);
   }
 
   @Override
-  public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
-    // call the transform function and then for each output call propagateResult()
-    this.transformFn.apply(message).forEach(r -> this.propagateResult(r, collector, coordinator));
+  protected OperatorSpec<RM> getOperatorSpec() {
+    return streamOpSpec;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
index cd3b1bc..b99f719 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
@@ -20,14 +20,16 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.spec.WindowOperatorSpec;
+import org.apache.samza.config.Config;
 import org.apache.samza.operators.WindowState;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.WindowOperatorSpec;
+import org.apache.samza.operators.triggers.FiringType;
 import org.apache.samza.operators.triggers.RepeatingTriggerImpl;
 import org.apache.samza.operators.triggers.TimeTrigger;
 import org.apache.samza.operators.triggers.Trigger;
 import org.apache.samza.operators.triggers.TriggerImpl;
 import org.apache.samza.operators.triggers.TriggerImpls;
-import org.apache.samza.operators.triggers.FiringType;
 import org.apache.samza.operators.util.InternalInMemoryStore;
 import org.apache.samza.operators.windows.AccumulationMode;
 import org.apache.samza.operators.windows.WindowKey;
@@ -36,6 +38,7 @@ import org.apache.samza.operators.windows.internal.WindowInternal;
 import org.apache.samza.operators.windows.internal.WindowType;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.util.Clock;
 import org.slf4j.Logger;
@@ -46,6 +49,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.function.Function;
 
 /**
@@ -61,9 +65,8 @@ import java.util.function.Function;
  * for the trigger and invokes {@link TriggerImplHandler#onMessage(TriggerKey, Object, MessageCollector, TaskCoordinator)}.
  * The {@link TriggerImplHandler} maintains the {@link TriggerImpl} instance along with whether it has been canceled yet
  * or not. Then, the {@link TriggerImplHandler} invokes onMessage on underlying its {@link TriggerImpl} instance. A
- * {@link TriggerImpl} instance is scoped to a window and its firing determines when results for its window are emitted. The
- * {@link WindowOperatorImpl} checks if the trigger fired, and propagates the result of the firing to its downstream
- * operators.
+ * {@link TriggerImpl} instance is scoped to a window and its firing determines when results for its window are emitted.
+ * The {@link WindowOperatorImpl} checks if the trigger fired and returns the result of the firing.
  *
  * @param <M> the type of the incoming message
  * @param <WK> the type of the key in this {@link org.apache.samza.operators.MessageStream}
@@ -74,56 +77,84 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK
 
   private static final Logger LOG = LoggerFactory.getLogger(WindowOperatorImpl.class);
 
+  private final WindowOperatorSpec<M, WK, WV> windowOpSpec;
+  private final Clock clock;
   private final WindowInternal<M, WK, WV> window;
   private final KeyValueStore<WindowKey<WK>, WindowState<WV>> store = new InternalInMemoryStore<>();
-  TriggerScheduler<WK> triggerScheduler ;
+  private TriggerScheduler<WK> triggerScheduler;
 
   // The trigger state corresponding to each {@link TriggerKey}.
   private final Map<TriggerKey<WK>, TriggerImplHandler> triggers = new HashMap<>();
-  private final Clock clock;
 
-  public WindowOperatorImpl(WindowOperatorSpec<M, WK, WV> spec, Clock clock) {
+  public WindowOperatorImpl(WindowOperatorSpec<M, WK, WV> windowOpSpec, Clock clock) {
+    this.windowOpSpec = windowOpSpec;
     this.clock = clock;
-    this.window = spec.getWindow();
+    this.window = windowOpSpec.getWindow();
     this.triggerScheduler= new TriggerScheduler(clock);
   }
 
   @Override
-  public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
+  protected void handleInit(Config config, TaskContext context) {
+    WindowInternal<M, WK, WV> window = windowOpSpec.getWindow();
+    if (window.getFoldLeftFunction() != null) {
+      window.getFoldLeftFunction().init(config, context);
+    }
+  }
+
+  @Override
+  public Collection<WindowPane<WK, WV>> handleMessage(
+      M message, MessageCollector collector, TaskCoordinator coordinator) {
     LOG.trace("Processing message envelope: {}", message);
+    List<WindowPane<WK, WV>> results = new ArrayList<>();
 
     WindowKey<WK> storeKey =  getStoreKey(message);
     WindowState<WV> existingState = store.get(storeKey);
     WindowState<WV> newState = applyFoldFunction(existingState, message);
 
-    LOG.trace("New window value: {}, earliest timestamp: {}", newState.getWindowValue(), newState.getEarliestTimestamp());
+    LOG.trace("New window value: {}, earliest timestamp: {}",
+        newState.getWindowValue(), newState.getEarliestTimestamp());
     store.put(storeKey, newState);
 
     if (window.getEarlyTrigger() != null) {
       TriggerKey<WK> triggerKey = new TriggerKey<>(FiringType.EARLY, storeKey);
 
-      getOrCreateTriggerImplWrapper(triggerKey, window.getEarlyTrigger())
-          .onMessage(triggerKey, message, collector, coordinator);
+      TriggerImplHandler triggerImplHandler = getOrCreateTriggerImplHandler(triggerKey, window.getEarlyTrigger());
+      Optional<WindowPane<WK, WV>> maybeTriggeredPane =
+          triggerImplHandler.onMessage(triggerKey, message, collector, coordinator);
+      maybeTriggeredPane.ifPresent(results::add);
     }
 
     if (window.getDefaultTrigger() != null) {
       TriggerKey<WK> triggerKey = new TriggerKey<>(FiringType.DEFAULT, storeKey);
-      getOrCreateTriggerImplWrapper(triggerKey, window.getDefaultTrigger())
-          .onMessage(triggerKey, message, collector, coordinator);
+      TriggerImplHandler triggerImplHandler = getOrCreateTriggerImplHandler(triggerKey, window.getDefaultTrigger());
+      Optional<WindowPane<WK, WV>> maybeTriggeredPane =
+          triggerImplHandler.onMessage(triggerKey, message, collector, coordinator);
+      maybeTriggeredPane.ifPresent(results::add);
     }
+
+    return results;
   }
 
   @Override
-  public void onTimer(MessageCollector collector, TaskCoordinator coordinator) {
+  public Collection<WindowPane<WK, WV>> handleTimer(MessageCollector collector, TaskCoordinator coordinator) {
+    List<WindowPane<WK, WV>> results = new ArrayList<>();
+
     List<TriggerKey<WK>> keys = triggerScheduler.runPendingCallbacks();
 
     for (TriggerKey<WK> key : keys) {
       TriggerImplHandler triggerImplHandler = triggers.get(key);
       if (triggerImplHandler != null) {
-        triggerImplHandler.onTimer(key, collector, coordinator);
+        Optional<WindowPane<WK, WV>> maybeTriggeredPane = triggerImplHandler.onTimer(key, collector, coordinator);
+        maybeTriggeredPane.ifPresent(results::add);
       }
     }
 
+    return results;
+  }
+
+  @Override
+  protected OperatorSpec<WindowPane<WK, WV>> getOperatorSpec() {
+    return windowOpSpec;
   }
 
   /**
@@ -168,7 +199,7 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK
     return newState;
   }
 
-  private TriggerImplHandler getOrCreateTriggerImplWrapper(TriggerKey<WK> triggerKey, Trigger<M> trigger) {
+  private TriggerImplHandler getOrCreateTriggerImplHandler(TriggerKey<WK> triggerKey, Trigger<M> trigger) {
     TriggerImplHandler wrapper = triggers.get(triggerKey);
     if (wrapper != null) {
       LOG.trace("Returning existing trigger wrapper for {}", triggerKey);
@@ -185,9 +216,10 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK
   }
 
   /**
-   * Handles trigger firings, and propagates results to downstream operators.
+   * Handles trigger firings and returns the optional result.
    */
-  private void onTriggerFired(TriggerKey<WK> triggerKey, MessageCollector collector, TaskCoordinator coordinator) {
+  private Optional<WindowPane<WK, WV>> onTriggerFired(
+      TriggerKey<WK> triggerKey, MessageCollector collector, TaskCoordinator coordinator) {
     LOG.trace("Trigger key {} fired." , triggerKey);
 
     TriggerImplHandler wrapper = triggers.get(triggerKey);
@@ -196,11 +228,10 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK
 
     if (state == null) {
       LOG.trace("No state found for triggerKey: {}", triggerKey);
-      return;
+      return Optional.empty();
     }
 
     WindowPane<WK, WV> paneOutput = computePaneOutput(triggerKey, state);
-    super.propagateResult(paneOutput, collector, coordinator);
 
     // Handle accumulation modes.
     if (window.getAccumulationMode() == AccumulationMode.DISCARDING) {
@@ -228,6 +259,8 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK
     if (triggerKey.getType() == FiringType.EARLY && !wrapper.isRepeating()) {
       cancelTrigger(triggerKey, false);
     }
+
+    return Optional.of(paneOutput);
   }
 
   /**
@@ -248,7 +281,8 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK
       windowVal = (WV) new ArrayList<>((Collection<WV>) windowVal);
     }
 
-    WindowPane<WK, WV> paneOutput = new WindowPane<>(windowKey, windowVal, window.getAccumulationMode(), triggerKey.getType());
+    WindowPane<WK, WV> paneOutput =
+        new WindowPane<>(windowKey, windowVal, window.getAccumulationMode(), triggerKey.getType());
     LOG.trace("Emitting pane output for trigger key {}", triggerKey);
     return paneOutput;
   }
@@ -279,7 +313,8 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK
       this.impl = impl;
     }
 
-    public void onMessage(TriggerKey<WK> triggerKey, M message, MessageCollector collector, TaskCoordinator coordinator) {
+    public Optional<WindowPane<WK, WV>> onMessage(TriggerKey<WK> triggerKey, M message,
+        MessageCollector collector, TaskCoordinator coordinator) {
       if (!isCancelled) {
         LOG.trace("Forwarding callbacks for {}", message);
         impl.onMessage(message, triggerScheduler);
@@ -289,12 +324,14 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK
           if (impl instanceof RepeatingTriggerImpl) {
             ((RepeatingTriggerImpl<M, WK>) impl).clear();
           }
-          onTriggerFired(triggerKey, collector, coordinator);
+          return onTriggerFired(triggerKey, collector, coordinator);
         }
       }
+      return Optional.empty();
     }
 
-    public void onTimer(TriggerKey<WK> key, MessageCollector collector, TaskCoordinator coordinator) {
+    public Optional<WindowPane<WK, WV>> onTimer(
+        TriggerKey<WK> key, MessageCollector collector, TaskCoordinator coordinator) {
       if (impl.shouldFire() && !isCancelled) {
         LOG.trace("Triggering timer triggers");
 
@@ -302,8 +339,9 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK
         if (impl instanceof RepeatingTriggerImpl) {
           ((RepeatingTriggerImpl<M, WK>) impl).clear();
         }
-        onTriggerFired(key, collector, coordinator);
+        return onTriggerFired(key, collector, coordinator);
       }
+      return Optional.empty();
     }
 
     public void cancel() {
@@ -315,5 +353,4 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK
       return this.impl instanceof RepeatingTriggerImpl;
     }
   }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
index 18090e2..cc3c4ab 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
@@ -19,9 +19,7 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.task.TaskContext;
 
 
 /**
@@ -34,6 +32,7 @@ import org.apache.samza.task.TaskContext;
 public interface OperatorSpec<OM> {
 
   enum OpCode {
+    INPUT,
     MAP,
     FLAT_MAP,
     FILTER,
@@ -64,10 +63,11 @@ public interface OperatorSpec<OM> {
   int getOpId();
 
   /**
-   * Init method to initialize the context for this {@link OperatorSpec}. The default implementation is NO-OP.
-   *
-   * @param config  the {@link Config} object for this task
-   * @param context  the {@link TaskContext} object for this task
+   * Get the name for this operator based on its opCode and opId.
+   * @return  the name for this operator
    */
-  default void init(Config config, TaskContext context) { }
+  default String getOpName() {
+    return String.format("%s-%s", getOpCode().name().toLowerCase(), getOpId());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
index b1dc529..e85626f 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
@@ -18,10 +18,8 @@
  */
 package org.apache.samza.operators.spec;
 
-import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.functions.PartialJoinFunction;
-import org.apache.samza.task.TaskContext;
 
 
 /**
@@ -88,10 +86,4 @@ public class PartialJoinOperatorSpec<K, M, JM, RM> implements OperatorSpec<RM> {
   public int getOpId() {
     return this.opId;
   }
-
-  @Override
-  public void init(Config config, TaskContext context) {
-    this.thisPartialJoinFn.init(config, context);
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
index 7de85f3..0d135d3 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
@@ -18,14 +18,12 @@
  */
 package org.apache.samza.operators.spec;
 
-import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.stream.OutputStreamInternal;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 
 
@@ -101,11 +99,6 @@ public class SinkOperatorSpec<M> implements OperatorSpec {
     return this.opId;
   }
 
-  @Override
-  public void init(Config config, TaskContext context) {
-    this.sinkFn.init(config, context);
-  }
-
   /**
    * Creates a {@link SinkFunction} to send messages to the provided {@code output}.
    * @param outputStream  the {@link OutputStreamInternal} to send messages to

http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
index f9bbe2d..204e566 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
@@ -18,10 +18,8 @@
  */
 package org.apache.samza.operators.spec;
 
-import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.task.TaskContext;
 
 
 /**
@@ -71,9 +69,4 @@ public class StreamOperatorSpec<M, OM> implements OperatorSpec<OM> {
   public int getOpId() {
     return this.opId;
   }
-
-  @Override
-  public void init(Config config, TaskContext context) {
-    this.transformFn.init(config, context);
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
index 9515e38..73b17b5 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
@@ -19,11 +19,9 @@
 
 package org.apache.samza.operators.spec;
 
-import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.internal.WindowInternal;
-import org.apache.samza.task.TaskContext;
 
 
 /**
@@ -53,13 +51,6 @@ public class WindowOperatorSpec<M, WK, WV> implements OperatorSpec<WindowPane<WK
   }
 
   @Override
-  public void init(Config config, TaskContext context) {
-    if (window.getFoldLeftFunction() != null) {
-      window.getFoldLeftFunction().init(config, context);
-    }
-  }
-
-  @Override
   public MessageStreamImpl<WindowPane<WK, WV>> getNextStream() {
     return this.nextStream;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index be52565..4720298 100644
--- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -118,17 +118,19 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
   public final void process(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator) {
     SystemStream systemStream = ime.getSystemStreamPartition().getSystemStream();
     InputStreamInternal inputStream = inputSystemStreamToInputStream.get(systemStream);
-    // TODO: SAMZA-1148 - Cast to appropriate input (key, msg) types based on the serde before applying the msgBuilder.
     RootOperatorImpl rootOperatorImpl = operatorImplGraph.getRootOperator(systemStream);
     if (rootOperatorImpl != null) {
-      rootOperatorImpl.onNext(inputStream.getMsgBuilder().apply(ime.getKey(), ime.getMessage()), collector, coordinator);
+      // TODO: SAMZA-1148 - Cast to appropriate input (key, msg) types based on the serde
+      // before applying the msgBuilder.
+      Object message = inputStream.getMsgBuilder().apply(ime.getKey(), ime.getMessage());
+      rootOperatorImpl.onMessage(message, collector, coordinator);
     }
   }
 
   @Override
   public final void window(MessageCollector collector, TaskCoordinator coordinator)  {
     operatorImplGraph.getAllRootOperators()
-        .forEach(rootOperator -> rootOperator.onTick(collector, coordinator));
+        .forEach(rootOperator -> rootOperator.onTimer(collector, coordinator));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
index 7a6f959..23b67aa 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
@@ -19,13 +19,10 @@
 package org.apache.samza.operators;
 
 import com.google.common.collect.ImmutableSet;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
 import org.apache.samza.Partition;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.IncomingMessageEnvelope;
@@ -42,6 +39,11 @@ import org.apache.samza.util.Clock;
 import org.apache.samza.util.SystemClock;
 import org.junit.Test;
 
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -237,6 +239,8 @@ public class TestJoinOperator {
     when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
         .of(new SystemStreamPartition("insystem", "instream", new Partition(0)),
             new SystemStreamPartition("insystem2", "instream2", new Partition(0))));
+    when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+
     Config config = mock(Config.class);
 
     StreamApplication sgb = new TestStreamApplication();

http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
index 6603137..597244e 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
@@ -26,9 +26,8 @@ import junit.framework.Assert;
 import org.apache.samza.Partition;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.operators.triggers.FiringType;
-import org.apache.samza.system.StreamSpec;
-import org.apache.samza.testUtils.TestClock;
 import org.apache.samza.operators.triggers.Trigger;
 import org.apache.samza.operators.triggers.Triggers;
 import org.apache.samza.operators.windows.AccumulationMode;
@@ -36,11 +35,13 @@ import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamOperatorTask;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.testUtils.TestClock;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -71,6 +72,7 @@ public class TestWindowOperator {
     runner = mock(ApplicationRunner.class);
     when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
         .of(new SystemStreamPartition("kafka", "integers", new Partition(0))));
+    when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
     when(runner.getStreamSpec("integer-stream")).thenReturn(new StreamSpec("integer-stream", "integers", "kafka"));
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
index f978c3c..bd18f0b 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -18,61 +18,205 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.data.TestMessageEnvelope;
-import org.apache.samza.operators.data.TestOutputMessageEnvelope;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
-import org.hamcrest.core.IsEqual;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.argThat;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 
 public class TestOperatorImpl {
 
-  TestMessageEnvelope curInputMsg;
-  MessageCollector curCollector;
-  TaskCoordinator curCoordinator;
+  @Test(expected = IllegalStateException.class)
+  public void testMultipleInitShouldThrow() {
+    OperatorImpl<Object, Object> opImpl = new TestOpImpl(mock(Object.class));
+    TaskContext mockTaskContext = mock(TaskContext.class);
+    when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+    opImpl.init(mock(Config.class), mockTaskContext);
+    opImpl.init(mock(Config.class), mockTaskContext);
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testRegisterNextOperatorBeforeInitShouldThrow() {
+    OperatorImpl<Object, Object> opImpl = new TestOpImpl(mock(Object.class));
+    opImpl.registerNextOperator(mock(OperatorImpl.class));
+  }
+
+  @Test
+  public void testOnMessagePropagatesResults() {
+    TaskContext mockTaskContext = mock(TaskContext.class);
+    when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+
+    Object mockTestOpImplOutput = mock(Object.class);
+    OperatorImpl<Object, Object> opImpl = new TestOpImpl(mockTestOpImplOutput);
+    opImpl.init(mock(Config.class), mockTaskContext);
+
+    // register a couple of operators
+    OperatorImpl mockNextOpImpl1 = mock(OperatorImpl.class);
+    when(mockNextOpImpl1.getOperatorSpec()).thenReturn(new TestOpSpec());
+    when(mockNextOpImpl1.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList());
+    mockNextOpImpl1.init(mock(Config.class), mockTaskContext);
+    opImpl.registerNextOperator(mockNextOpImpl1);
+
+    OperatorImpl mockNextOpImpl2 = mock(OperatorImpl.class);
+    when(mockNextOpImpl2.getOperatorSpec()).thenReturn(new TestOpSpec());
+    when(mockNextOpImpl2.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList());
+    mockNextOpImpl2.init(mock(Config.class), mockTaskContext);
+    opImpl.registerNextOperator(mockNextOpImpl2);
+
+    // send a message to this operator
+    MessageCollector mockCollector = mock(MessageCollector.class);
+    TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
+    opImpl.onMessage(mock(Object.class), mockCollector, mockCoordinator);
+
+    // verify that it propagates its handleMessage results to next operators
+    verify(mockNextOpImpl1, times(1)).handleMessage(mockTestOpImplOutput, mockCollector, mockCoordinator);
+    verify(mockNextOpImpl2, times(1)).handleMessage(mockTestOpImplOutput, mockCollector, mockCoordinator);
+  }
+
+  @Test
+  public void testOnMessageUpdatesMetrics() {
+    TaskContext mockTaskContext = mock(TaskContext.class);
+    MetricsRegistry mockMetricsRegistry = mock(MetricsRegistry.class);
+    when(mockTaskContext.getMetricsRegistry()).thenReturn(mockMetricsRegistry);
+    Counter mockCounter = mock(Counter.class);
+    Timer mockTimer = mock(Timer.class);
+    when(mockMetricsRegistry.newCounter(anyString(), anyString())).thenReturn(mockCounter);
+    when(mockMetricsRegistry.newTimer(anyString(), anyString())).thenReturn(mockTimer);
+
+    Object mockTestOpImplOutput = mock(Object.class);
+    OperatorImpl<Object, Object> opImpl = new TestOpImpl(mockTestOpImplOutput);
+    opImpl.init(mock(Config.class), mockTaskContext);
+
+    // send a message to this operator
+    MessageCollector mockCollector = mock(MessageCollector.class);
+    TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
+    opImpl.onMessage(mock(Object.class), mockCollector, mockCoordinator);
+
+    // verify that it updates message count and timer metrics
+    verify(mockCounter, times(1)).inc();
+    verify(mockTimer, times(1)).update(anyLong());
+  }
+
+  @Test
+  public void testOnTimerPropagatesResultsAndTimer() {
+    TaskContext mockTaskContext = mock(TaskContext.class);
+    when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+
+    Object mockTestOpImplOutput = mock(Object.class);
+    OperatorImpl<Object, Object> opImpl = new TestOpImpl(mockTestOpImplOutput);
+    opImpl.init(mock(Config.class), mockTaskContext);
+
+    // register a couple of operators
+    OperatorImpl mockNextOpImpl1 = mock(OperatorImpl.class);
+    when(mockNextOpImpl1.getOperatorSpec()).thenReturn(new TestOpSpec());
+    when(mockNextOpImpl1.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList());
+    mockNextOpImpl1.init(mock(Config.class), mockTaskContext);
+    opImpl.registerNextOperator(mockNextOpImpl1);
+
+    OperatorImpl mockNextOpImpl2 = mock(OperatorImpl.class);
+    when(mockNextOpImpl2.getOperatorSpec()).thenReturn(new TestOpSpec());
+    when(mockNextOpImpl2.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList());
+    mockNextOpImpl2.init(mock(Config.class), mockTaskContext);
+    opImpl.registerNextOperator(mockNextOpImpl2);
+
+    // send a timer tick to this operator
+    MessageCollector mockCollector = mock(MessageCollector.class);
+    TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
+    opImpl.onTimer(mockCollector, mockCoordinator);
+
+    // verify that it propagates its handleTimer results to next operators
+    verify(mockNextOpImpl1, times(1)).handleMessage(mockTestOpImplOutput, mockCollector, mockCoordinator);
+    verify(mockNextOpImpl2, times(1)).handleMessage(mockTestOpImplOutput, mockCollector, mockCoordinator);
+
+    // verify that it propagates the timer tick to next operators
+    verify(mockNextOpImpl1, times(1)).handleTimer(mockCollector, mockCoordinator);
+    verify(mockNextOpImpl2, times(1)).handleTimer(mockCollector, mockCoordinator);
+  }
 
   @Test
-  public void testSubscribers() {
-    this.curInputMsg = null;
-    this.curCollector = null;
-    this.curCoordinator = null;
-    OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl = new OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope>() {
-      @Override
-      public void onNext(TestMessageEnvelope message, MessageCollector collector, TaskCoordinator coordinator) {
-        TestOperatorImpl.this.curInputMsg = message;
-        TestOperatorImpl.this.curCollector = collector;
-        TestOperatorImpl.this.curCoordinator = coordinator;
-      }
-      @Override
-      public void onTimer(MessageCollector collector, TaskCoordinator coordinator) {
-
-      }
-
-      };
-    // verify registerNextOperator() added the mockSub and propagateResult() invoked the mockSub.onNext()
-    OperatorImpl mockSub = mock(OperatorImpl.class);
-    opImpl.registerNextOperator(mockSub);
-    TestOutputMessageEnvelope xOutput = mock(TestOutputMessageEnvelope.class);
+  public void testOnTimerUpdatesMetrics() {
+    TaskContext mockTaskContext = mock(TaskContext.class);
+    MetricsRegistry mockMetricsRegistry = mock(MetricsRegistry.class);
+    when(mockTaskContext.getMetricsRegistry()).thenReturn(mockMetricsRegistry);
+    Counter mockMessageCounter = mock(Counter.class);
+    Timer mockTimer = mock(Timer.class);
+    when(mockMetricsRegistry.newCounter(anyString(), anyString())).thenReturn(mockMessageCounter);
+    when(mockMetricsRegistry.newTimer(anyString(), anyString())).thenReturn(mockTimer);
+
+    Object mockTestOpImplOutput = mock(Object.class);
+    OperatorImpl<Object, Object> opImpl = new TestOpImpl(mockTestOpImplOutput);
+    opImpl.init(mock(Config.class), mockTaskContext);
+
+    // send a message to this operator
     MessageCollector mockCollector = mock(MessageCollector.class);
     TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
-    opImpl.propagateResult(xOutput, mockCollector, mockCoordinator);
-    verify(mockSub, times(1)).onNext(
-        argThat(new IsEqual<>(xOutput)),
-        argThat(new IsEqual<>(mockCollector)),
-        argThat(new IsEqual<>(mockCoordinator))
-    );
-    // verify onNext() is invoked correctly
-    TestMessageEnvelope mockInput = mock(TestMessageEnvelope.class);
-    opImpl.onNext(mockInput, mockCollector, mockCoordinator);
-    assertEquals(mockInput, this.curInputMsg);
-    assertEquals(mockCollector, this.curCollector);
-    assertEquals(mockCoordinator, this.curCoordinator);
+    opImpl.onTimer(mockCollector, mockCoordinator);
+
+    // verify that it updates metrics
+    verify(mockMessageCounter, times(0)).inc();
+    verify(mockTimer, times(1)).update(anyLong());
+  }
+
+  private static class TestOpImpl extends OperatorImpl<Object, Object> {
+    private final Object mockOutput;
+
+    TestOpImpl(Object mockOutput) {
+      this.mockOutput = mockOutput;
+    }
+
+    @Override
+    protected void handleInit(Config config, TaskContext context) {}
+
+    @Override
+    public Collection<Object> handleMessage(Object message,
+        MessageCollector collector, TaskCoordinator coordinator) {
+      return Collections.singletonList(mockOutput);
+    }
+
+    @Override
+    public Collection<Object> handleTimer(MessageCollector collector, TaskCoordinator coordinator) {
+      return Collections.singletonList(mockOutput);
+    }
+
+    @Override
+    protected OperatorSpec<Object> getOperatorSpec() {
+      return new TestOpSpec();
+    }
+  }
+
+  private static class TestOpSpec implements OperatorSpec<Object> {
+    @Override
+    public MessageStreamImpl<Object> getNextStream() {
+      return null;
+    }
+
+    @Override
+    public OpCode getOpCode() {
+      return OpCode.INPUT;
+    }
+
+    @Override
+    public int getOpId() {
+      return -1;
+    }
   }
 }
+

http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
index 267cdfc..a75fadb 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
@@ -19,6 +19,7 @@
 package org.apache.samza.operators.impl;
 
 import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.TestMessageStreamImplUtil;
@@ -26,11 +27,10 @@ import org.apache.samza.operators.data.TestMessageEnvelope;
 import org.apache.samza.operators.data.TestOutputMessageEnvelope;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.operators.functions.PartialJoinFunction;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.SinkOperatorSpec;
 import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
+import org.apache.samza.operators.spec.SinkOperatorSpec;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
 import org.apache.samza.operators.spec.WindowOperatorSpec;
 import org.apache.samza.operators.windows.Windows;
@@ -62,14 +62,15 @@ public class TestOperatorImpls {
 
   @Before
   public void prep() throws NoSuchFieldException, NoSuchMethodException {
-    nextOperatorsField = OperatorImpl.class.getDeclaredField("nextOperators");
+    nextOperatorsField = OperatorImpl.class.getDeclaredField("registeredOperators");
     nextOperatorsField.setAccessible(true);
 
-    createOpMethod = OperatorImplGraph.class.getDeclaredMethod("createOperatorImpl", MessageStreamImpl.class,
+    createOpMethod = OperatorImplGraph.class.getDeclaredMethod("createOperatorImpl",
         OperatorSpec.class, Config.class, TaskContext.class);
     createOpMethod.setAccessible(true);
 
-    createOpsMethod = OperatorImplGraph.class.getDeclaredMethod("createOperatorImpls", MessageStreamImpl.class, Config.class, TaskContext.class);
+    createOpsMethod = OperatorImplGraph.class.getDeclaredMethod("createOperatorImpls", MessageStreamImpl.class,
+        Config.class, TaskContext.class);
     createOpsMethod.setAccessible(true);
   }
 
@@ -79,13 +80,12 @@ public class TestOperatorImpls {
     WindowOperatorSpec mockWnd = mock(WindowOperatorSpec.class);
     WindowInternal<TestMessageEnvelope, String, Integer> windowInternal = new WindowInternal<>(null, null, null, null, null, WindowType.TUMBLING);
     when(mockWnd.getWindow()).thenReturn(windowInternal);
-    MessageStreamImpl<TestMessageEnvelope> mockStream = mock(MessageStreamImpl.class);
     Config mockConfig = mock(Config.class);
     TaskContext mockContext = mock(TaskContext.class);
 
     OperatorImplGraph opGraph = new OperatorImplGraph();
     OperatorImpl<TestMessageEnvelope, ?> opImpl = (OperatorImpl<TestMessageEnvelope, ?>)
-        createOpMethod.invoke(opGraph, mockStream, mockWnd, mockConfig, mockContext);
+        createOpMethod.invoke(opGraph, mockWnd, mockConfig, mockContext);
     assertTrue(opImpl instanceof WindowOperatorImpl);
     Field wndInternalField = WindowOperatorImpl.class.getDeclaredField("window");
     wndInternalField.setAccessible(true);
@@ -96,7 +96,7 @@ public class TestOperatorImpls {
     StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockSimpleOp = mock(StreamOperatorSpec.class);
     FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> mockTxfmFn = mock(FlatMapFunction.class);
     when(mockSimpleOp.getTransformFn()).thenReturn(mockTxfmFn);
-    opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, mockStream, mockSimpleOp, mockConfig, mockContext);
+    opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, mockSimpleOp, mockConfig, mockContext);
     assertTrue(opImpl instanceof StreamOperatorImpl);
     Field txfmFnField = StreamOperatorImpl.class.getDeclaredField("transformFn");
     txfmFnField.setAccessible(true);
@@ -106,7 +106,7 @@ public class TestOperatorImpls {
     SinkFunction<TestMessageEnvelope> sinkFn = (m, mc, tc) -> { };
     SinkOperatorSpec<TestMessageEnvelope> sinkOp = mock(SinkOperatorSpec.class);
     when(sinkOp.getSinkFn()).thenReturn(sinkFn);
-    opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, mockStream, sinkOp, mockConfig, mockContext);
+    opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, sinkOp, mockConfig, mockContext);
     assertTrue(opImpl instanceof SinkOperatorImpl);
     Field sinkFnField = SinkOperatorImpl.class.getDeclaredField("sinkFn");
     sinkFnField.setAccessible(true);
@@ -114,8 +114,7 @@ public class TestOperatorImpls {
 
     // get join operator
     PartialJoinOperatorSpec<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joinOp = mock(PartialJoinOperatorSpec.class);
-    PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joinFn = mock(PartialJoinFunction.class);
-    opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, mockStream, joinOp, mockConfig, mockContext);
+    opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, joinOp, mockConfig, mockContext);
     assertTrue(opImpl instanceof PartialJoinOperatorImpl);
   }
 
@@ -124,6 +123,7 @@ public class TestOperatorImpls {
     // test creation of empty chain
     MessageStreamImpl<TestMessageEnvelope> testStream = mock(MessageStreamImpl.class);
     TaskContext mockContext = mock(TaskContext.class);
+    when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
     Config mockConfig = mock(Config.class);
     OperatorImplGraph opGraph = new OperatorImplGraph();
     RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testStream, mockConfig, mockContext);
@@ -134,8 +134,9 @@ public class TestOperatorImpls {
   public void testLinearChain() throws IllegalAccessException, InvocationTargetException {
     // test creation of linear chain
     StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
-    MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph);
+    MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
     TaskContext mockContext = mock(TaskContext.class);
+    when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
     Config mockConfig = mock(Config.class);
     testInput.map(m -> m).window(Windows.keyedSessionWindow(TestMessageEnvelope::getKey, Duration.ofMinutes(10)));
     OperatorImplGraph opGraph = new OperatorImplGraph();
@@ -154,8 +155,9 @@ public class TestOperatorImpls {
   public void testBroadcastChain() throws IllegalAccessException, InvocationTargetException {
     // test creation of broadcast chain
     StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
-    MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph);
+    MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
     TaskContext mockContext = mock(TaskContext.class);
+    when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
     Config mockConfig = mock(Config.class);
     testInput.filter(m -> m.getMessage().getEventTime() > 123456L).flatMap(m -> new ArrayList() { { this.add(m); this.add(m); } });
     testInput.filter(m -> m.getMessage().getEventTime() < 123456L).map(m -> m);
@@ -187,6 +189,7 @@ public class TestOperatorImpls {
     MessageStreamImpl<TestMessageEnvelope> input1 = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
     MessageStreamImpl<TestMessageEnvelope> input2 = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
     TaskContext mockContext = mock(TaskContext.class);
+    when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
     Config mockConfig = mock(Config.class);
     input1
         .join(input2,

http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
index abd7740..1c01e57 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
@@ -27,7 +27,10 @@ import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.junit.Test;
 
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 
 public class TestSinkOperatorImpl {
@@ -44,7 +47,7 @@ public class TestSinkOperatorImpl {
     MessageCollector mockCollector = mock(MessageCollector.class);
     TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
 
-    sinkImpl.onNext(mockMsg, mockCollector, mockCoordinator);
+    sinkImpl.handleMessage(mockMsg, mockCollector, mockCoordinator);
     verify(sinkFn, times(1)).apply(mockMsg, mockCollector, mockCoordinator);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
index 9dd161a..36d7b92 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
@@ -18,10 +18,7 @@
  */
 package org.apache.samza.operators.impl;
 
-import java.util.ArrayList;
-import java.util.Collection;
 import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.data.TestMessageEnvelope;
 import org.apache.samza.operators.data.TestOutputMessageEnvelope;
 import org.apache.samza.operators.functions.FlatMapFunction;
@@ -31,7 +28,15 @@ import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.junit.Test;
 
-import static org.mockito.Mockito.*;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 
 public class TestStreamOperatorImpl {
@@ -42,10 +47,10 @@ public class TestStreamOperatorImpl {
     StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockOp = mock(StreamOperatorSpec.class);
     FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> txfmFn = mock(FlatMapFunction.class);
     when(mockOp.getTransformFn()).thenReturn(txfmFn);
-    MessageStreamImpl<TestMessageEnvelope> mockInput = mock(MessageStreamImpl.class);
     Config mockConfig = mock(Config.class);
     TaskContext mockContext = mock(TaskContext.class);
-    StreamOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl = spy(new StreamOperatorImpl<>(mockOp, mockInput, mockConfig, mockContext));
+    StreamOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl =
+        spy(new StreamOperatorImpl<>(mockOp, mockConfig, mockContext));
     TestMessageEnvelope inMsg = mock(TestMessageEnvelope.class);
     TestOutputMessageEnvelope outMsg = mock(TestOutputMessageEnvelope.class);
     Collection<TestOutputMessageEnvelope> mockOutputs = new ArrayList() { {
@@ -54,8 +59,9 @@ public class TestStreamOperatorImpl {
     when(txfmFn.apply(inMsg)).thenReturn(mockOutputs);
     MessageCollector mockCollector = mock(MessageCollector.class);
     TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
-    opImpl.onNext(inMsg, mockCollector, mockCoordinator);
+    Collection<TestOutputMessageEnvelope> results = opImpl
+        .handleMessage(inMsg, mockCollector, mockCoordinator);
     verify(txfmFn, times(1)).apply(inMsg);
-    verify(opImpl, times(1)).propagateResult(outMsg, mockCollector, mockCoordinator);
+    assertEquals(results, mockOutputs);
   }
 }