You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/09/29 00:11:45 UTC

[2/3] samza git commit: SAMZA-1386: Inline End-of-stream and Watermark logic inside OperatorImpl

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
index 99496eb..9b747bc 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
@@ -18,9 +18,14 @@
  */
 package org.apache.samza.operators.impl;
 
+import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import java.util.stream.Collectors;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskContextImpl;
+import org.apache.samza.job.model.JobModel;
 import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.PartialJoinFunction;
@@ -36,6 +41,8 @@ import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -50,6 +57,7 @@ import java.util.Map;
  * The DAG of {@link OperatorImpl}s corresponding to the DAG of {@link OperatorSpec}s.
  */
 public class OperatorImplGraph {
+  private static final Logger LOG = LoggerFactory.getLogger(OperatorImplGraph.class);
 
   /**
    * A mapping from operator names to their {@link OperatorImpl}s in this graph. Used to avoid creating
@@ -84,10 +92,27 @@ public class OperatorImplGraph {
    */
   public OperatorImplGraph(StreamGraphImpl streamGraph, Config config, TaskContext context, Clock clock) {
     this.clock = clock;
+
+    TaskContextImpl taskContext = (TaskContextImpl) context;
+    Map<SystemStream, Integer> producerTaskCounts = hasIntermediateStreams(streamGraph) ?
+        getProducerTaskCountForIntermediateStreams(getStreamToConsumerTasks(taskContext.getJobModel()),
+            getIntermediateToInputStreamsMap(streamGraph)) :
+        Collections.EMPTY_MAP;
+    producerTaskCounts.forEach((stream, count) -> {
+        LOG.info("{} has {} producer tasks.", stream, count);
+      });
+
+    // set states for end-of-stream
+    taskContext.registerObject(EndOfStreamStates.class.getName(),
+        new EndOfStreamStates(context.getSystemStreamPartitions(), producerTaskCounts));
+    // set states for watermark
+    taskContext.registerObject(WatermarkStates.class.getName(),
+        new WatermarkStates(context.getSystemStreamPartitions(), producerTaskCounts));
+
     streamGraph.getInputOperators().forEach((streamSpec, inputOpSpec) -> {
         SystemStream systemStream = new SystemStream(streamSpec.getSystemName(), streamSpec.getPhysicalName());
         InputOperatorImpl inputOperatorImpl =
-            (InputOperatorImpl) createAndRegisterOperatorImpl(null, inputOpSpec, config, context);
+            (InputOperatorImpl) createAndRegisterOperatorImpl(null, inputOpSpec, systemStream, config, context);
         this.inputOperators.put(systemStream, inputOperatorImpl);
       });
   }
@@ -128,17 +153,18 @@ public class OperatorImplGraph {
    * @return  the operator implementation for the operatorSpec
    */
   OperatorImpl createAndRegisterOperatorImpl(OperatorSpec prevOperatorSpec, OperatorSpec operatorSpec,
-      Config config, TaskContext context) {
+      SystemStream inputStream, Config config, TaskContext context) {
     if (!operatorImpls.containsKey(operatorSpec.getOpName()) || operatorSpec instanceof JoinOperatorSpec) {
       // Either this is the first time we've seen this operatorSpec, or this is a join operator spec
       // and we need to create 2 partial join operator impls for it. Initialize and register the sub-DAG.
       OperatorImpl operatorImpl = createOperatorImpl(prevOperatorSpec, operatorSpec, config, context);
       operatorImpl.init(config, context);
+      operatorImpl.registerInputStream(inputStream);
       operatorImpls.put(operatorImpl.getOperatorName(), operatorImpl);
 
       Collection<OperatorSpec> registeredSpecs = operatorSpec.getRegisteredOperatorSpecs();
       registeredSpecs.forEach(registeredSpec -> {
-          OperatorImpl nextImpl = createAndRegisterOperatorImpl(operatorSpec, registeredSpec, config, context);
+          OperatorImpl nextImpl = createAndRegisterOperatorImpl(operatorSpec, registeredSpec, inputStream, config, context);
           operatorImpl.registerNextOperator(nextImpl);
         });
       return operatorImpl;
@@ -246,4 +272,70 @@ public class OperatorImplGraph {
       }
     };
   }
+
+  private boolean hasIntermediateStreams(StreamGraphImpl streamGraph) {
+    return !Collections.disjoint(streamGraph.getInputOperators().keySet(), streamGraph.getOutputStreams().keySet());
+  }
+
+  /**
+   * calculate the task count that produces to each intermediate streams
+   * @param streamToConsumerTasks input streams to task mapping
+   * @param intermediateToInputStreams intermediate stream to input streams mapping
+   * @return mapping from intermediate stream to task count
+   */
+  static Map<SystemStream, Integer> getProducerTaskCountForIntermediateStreams(
+      Multimap<SystemStream, String> streamToConsumerTasks,
+      Multimap<SystemStream, SystemStream> intermediateToInputStreams) {
+    Map<SystemStream, Integer> result = new HashMap<>();
+    intermediateToInputStreams.asMap().entrySet().forEach(entry -> {
+        result.put(entry.getKey(),
+            entry.getValue().stream()
+                .flatMap(systemStream -> streamToConsumerTasks.get(systemStream).stream())
+                .collect(Collectors.toSet()).size());
+      });
+    return result;
+  }
+
+  /**
+   * calculate the mapping from input streams to consumer tasks
+   * @param jobModel JobModel object
+   * @return mapping from input stream to tasks
+   */
+  static Multimap<SystemStream, String> getStreamToConsumerTasks(JobModel jobModel) {
+    Multimap<SystemStream, String> streamToConsumerTasks = HashMultimap.create();
+    jobModel.getContainers().values().forEach(containerModel -> {
+        containerModel.getTasks().values().forEach(taskModel -> {
+            taskModel.getSystemStreamPartitions().forEach(ssp -> {
+                streamToConsumerTasks.put(ssp.getSystemStream(), taskModel.getTaskName().getTaskName());
+              });
+          });
+      });
+    return streamToConsumerTasks;
+  }
+
+  /**
+   * calculate the mapping from output streams to input streams
+   * @param streamGraph the user {@link StreamGraphImpl} instance
+   * @return mapping from output streams to input streams
+   */
+  static Multimap<SystemStream, SystemStream> getIntermediateToInputStreamsMap(StreamGraphImpl streamGraph) {
+    Multimap<SystemStream, SystemStream> outputToInputStreams = HashMultimap.create();
+    streamGraph.getInputOperators().entrySet().stream()
+        .forEach(
+            entry -> computeOutputToInput(entry.getKey().toSystemStream(), entry.getValue(), outputToInputStreams));
+    return outputToInputStreams;
+  }
+
+  private static void computeOutputToInput(SystemStream input, OperatorSpec opSpec,
+      Multimap<SystemStream, SystemStream> outputToInputStreams) {
+    if (opSpec instanceof OutputOperatorSpec) {
+      OutputOperatorSpec outputOpSpec = (OutputOperatorSpec) opSpec;
+      if (outputOpSpec.getOpCode() == OperatorSpec.OpCode.PARTITION_BY) {
+        outputToInputStreams.put(outputOpSpec.getOutputStream().getStreamSpec().toSystemStream(), input);
+      }
+    } else {
+      Collection<OperatorSpec> nextOperators = opSpec.getRegisteredOperatorSpecs();
+      nextOperators.forEach(spec -> computeOutputToInput(input, spec, outputToInputStreams));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
index f212b3e..205bba6 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
@@ -21,12 +21,16 @@ package org.apache.samza.operators.impl;
 import java.util.Collection;
 import java.util.Collections;
 import org.apache.samza.config.Config;
-import org.apache.samza.control.Watermark;
+import org.apache.samza.container.TaskContextImpl;
+import org.apache.samza.system.ControlMessage;
+import org.apache.samza.system.EndOfStreamMessage;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OutputOperatorSpec;
 import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.WatermarkMessage;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
@@ -39,10 +43,16 @@ class OutputOperatorImpl<M> extends OperatorImpl<M, Void> {
 
   private final OutputOperatorSpec<M> outputOpSpec;
   private final OutputStreamImpl<?, ?, M> outputStream;
+  private final String taskName;
+  private final ControlMessageSender controlMessageSender;
 
   OutputOperatorImpl(OutputOperatorSpec<M> outputOpSpec, Config config, TaskContext context) {
     this.outputOpSpec = outputOpSpec;
     this.outputStream = outputOpSpec.getOutputStream();
+    this.taskName = context.getTaskName().getTaskName();
+
+    StreamMetadataCache streamMetadataCache = ((TaskContextImpl) context).getStreamMetadataCache();
+    this.controlMessageSender = new ControlMessageSender(streamMetadataCache);
   }
 
   @Override
@@ -71,12 +81,22 @@ class OutputOperatorImpl<M> extends OperatorImpl<M, Void> {
   }
 
   @Override
-  protected long handleWatermark(Watermark inputWatermark,
-      MessageCollector collector,
-      TaskCoordinator coordinator) {
+  protected void handleEndOfStream(MessageCollector collector, TaskCoordinator coordinator) {
     if (outputOpSpec.getOpCode() == OperatorSpec.OpCode.PARTITION_BY) {
-      inputWatermark.propagate(outputStream.getStreamSpec().toSystemStream());
+      sendControlMessage(new EndOfStreamMessage(taskName), collector);
     }
-    return inputWatermark.getTimestamp();
+  }
+
+  @Override
+  protected Long handleWatermark(long watermark, MessageCollector collector, TaskCoordinator coordinator) {
+    if (outputOpSpec.getOpCode() == OperatorSpec.OpCode.PARTITION_BY) {
+      sendControlMessage(new WatermarkMessage(watermark, taskName), collector);
+    }
+    return watermark;
+  }
+
+  private void sendControlMessage(ControlMessage message, MessageCollector collector) {
+    SystemStream outputStream = outputOpSpec.getOutputStream().getStreamSpec().toSystemStream();
+    controlMessageSender.send(message, outputStream, collector);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java
new file mode 100644
index 0000000..0295626
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.impl;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.WatermarkMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class manages the watermarks coming from input/intermediate streams in a task. Internally it keeps track
+ * of the latest watermark timestamp from each upstream task, and use the min as the consolidated watermark time.
+ *
+ * This class is thread-safe. However, having parallelism within a task may result in out-of-order processing
+ * and inaccurate watermarks. In this scenario, watermarks might be emitted before the previous messages fully processed.
+ */
+class WatermarkStates {
+  private static final Logger LOG = LoggerFactory.getLogger(WatermarkStates.class);
+
+  public static final long WATERMARK_NOT_EXIST = -1;
+
+  private final static class WatermarkState {
+    private final int expectedTotal;
+    private final Map<String, Long> timestamps = new HashMap<>();
+    private volatile long watermarkTime = WATERMARK_NOT_EXIST;
+
+    WatermarkState(int expectedTotal) {
+      this.expectedTotal = expectedTotal;
+    }
+
+    synchronized void update(long timestamp, String taskName) {
+      if (taskName != null) {
+        Long ts = timestamps.get(taskName);
+        if (ts != null && ts > timestamp) {
+          LOG.warn(String.format("Incoming watermark %s is smaller than existing watermark %s for upstream task %s",
+              timestamp, ts, taskName));
+        } else {
+          timestamps.put(taskName, timestamp);
+        }
+      }
+
+      /**
+       * Check whether we got all the watermarks.
+       * At a sources, the expectedTotal is 0.
+       * For any intermediate streams, the expectedTotal is the upstream task count.
+       */
+      if (timestamps.size() == expectedTotal) {
+        Optional<Long> min = timestamps.values().stream().min(Long::compare);
+        watermarkTime = min.orElse(timestamp);
+      }
+    }
+
+    long getWatermarkTime() {
+      return watermarkTime;
+    }
+  }
+
+  private final Map<SystemStreamPartition, WatermarkState> watermarkStates;
+
+  WatermarkStates(Set<SystemStreamPartition> ssps, Map<SystemStream, Integer> producerTaskCounts) {
+    Map<SystemStreamPartition, WatermarkState> states = new HashMap<>();
+    ssps.forEach(ssp -> {
+        states.put(ssp, new WatermarkState(producerTaskCounts.getOrDefault(ssp.getSystemStream(), 0)));
+      });
+    this.watermarkStates = Collections.unmodifiableMap(states);
+  }
+
+  /**
+   * Update the state upon receiving a watermark message.
+   * @param watermarkMessage message of {@link WatermarkMessage}
+   * @param ssp system stream partition
+   * @return true iff the stream has a new watermark
+   */
+  void update(WatermarkMessage watermarkMessage, SystemStreamPartition ssp) {
+    WatermarkState state = watermarkStates.get(ssp);
+    if (state != null) {
+      state.update(watermarkMessage.getTimestamp(), watermarkMessage.getTaskName());
+    } else {
+      LOG.error("SSP {} doesn't have watermark states", ssp);
+    }
+  }
+
+  long getWatermark(SystemStream systemStream) {
+    return watermarkStates.entrySet().stream()
+        .filter(entry -> entry.getKey().getSystemStream().equals(systemStream))
+        .map(entry -> entry.getValue().getWatermarkTime())
+        .min(Long::compare)
+        .orElse(WATERMARK_NOT_EXIST);
+  }
+
+  /* package private for testing */
+  long getWatermarkPerSSP(SystemStreamPartition ssp) {
+    return watermarkStates.get(ssp).getWatermarkTime();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
index 6fbc3c1..773f742 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
@@ -19,6 +19,7 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.system.StreamSpec;
 
 import java.util.function.BiFunction;
@@ -49,4 +50,9 @@ public class InputOperatorSpec<K, V, M> extends OperatorSpec<Pair<K, V>, M> {
   public BiFunction<K, V, M> getMsgBuilder() {
     return this.msgBuilder;
   }
+
+  @Override
+  public WatermarkFunction getWatermarkFn() {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
index 16f59d7..f4fe0fd 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
@@ -19,6 +19,7 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.functions.WatermarkFunction;
 
 
 /**
@@ -70,4 +71,9 @@ public class JoinOperatorSpec<K, M, JM, RM> extends OperatorSpec<Object, RM> { /
   public long getTtlMs() {
     return ttlMs;
   }
+
+  @Override
+  public WatermarkFunction getWatermarkFn() {
+    return joinFn instanceof WatermarkFunction ? (WatermarkFunction) joinFn : null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
index f64e123..4047d92 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
@@ -19,6 +19,7 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.functions.WatermarkFunction;
 
 import java.util.Collection;
 import java.util.LinkedHashSet;
@@ -118,4 +119,6 @@ public abstract class OperatorSpec<M, OM> {
   public final String getOpName() {
     return String.format("%s-%s", getOpCode().name().toLowerCase(), getOpId());
   }
+
+  abstract public WatermarkFunction getWatermarkFn();
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
index e6767ec..9759392 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
@@ -19,6 +19,8 @@
 package org.apache.samza.operators.spec;
 
 
+import org.apache.samza.operators.functions.WatermarkFunction;
+
 /**
  * The spec for an operator that outputs a {@link org.apache.samza.operators.MessageStream} to a
  * {@link org.apache.samza.system.SystemStream}.
@@ -52,4 +54,9 @@ public class OutputOperatorSpec<M> extends OperatorSpec<M, Void> {
   public OutputStreamImpl<?, ?, M> getOutputStream() {
     return this.outputStream;
   }
+
+  @Override
+  public WatermarkFunction getWatermarkFn() {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
index 2b55d95..1145be8 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
@@ -19,6 +19,7 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.functions.WatermarkFunction;
 
 
 /**
@@ -48,4 +49,9 @@ public class SinkOperatorSpec<M> extends OperatorSpec<M, Void> {
   public SinkFunction<M> getSinkFn() {
     return this.sinkFn;
   }
+
+  @Override
+  public WatermarkFunction getWatermarkFn() {
+    return sinkFn instanceof WatermarkFunction ? (WatermarkFunction) sinkFn : null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
index 1f2f683..aace2e2 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
@@ -19,6 +19,7 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.WatermarkFunction;
 
 
 /**
@@ -46,4 +47,9 @@ public class StreamOperatorSpec<M, OM> extends OperatorSpec<M, OM> {
   public FlatMapFunction<M, OM> getTransformFn() {
     return this.transformFn;
   }
+
+  @Override
+  public WatermarkFunction getWatermarkFn() {
+    return transformFn instanceof WatermarkFunction ? (WatermarkFunction) transformFn : null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
index 0937499..75f1427 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
@@ -19,6 +19,8 @@
 
 package org.apache.samza.operators.spec;
 
+import org.apache.samza.operators.functions.FoldLeftFunction;
+import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.operators.triggers.AnyTrigger;
 import org.apache.samza.operators.triggers.RepeatingTrigger;
 import org.apache.samza.operators.triggers.TimeBasedTrigger;
@@ -109,4 +111,10 @@ public class WindowOperatorSpec<M, WK, WV> extends OperatorSpec<M, WindowPane<WK
     }
     return timeBasedTriggers;
   }
+
+  @Override
+  public WatermarkFunction getWatermarkFn() {
+    FoldLeftFunction fn = window.getFoldLeftFunction();
+    return fn instanceof WatermarkFunction ? (WatermarkFunction) fn : null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java b/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
index 0b98ec6..2ed559f 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
@@ -21,9 +21,9 @@ package org.apache.samza.serializers;
 
 import java.util.Arrays;
 import org.apache.samza.SamzaException;
-import org.apache.samza.message.EndOfStreamMessage;
-import org.apache.samza.message.IntermediateMessageType;
-import org.apache.samza.message.WatermarkMessage;
+import org.apache.samza.system.EndOfStreamMessage;
+import org.apache.samza.system.MessageType;
+import org.apache.samza.system.WatermarkMessage;
 import org.codehaus.jackson.type.TypeReference;
 
 
@@ -86,16 +86,16 @@ public class IntermediateMessageSerde implements Serde<Object> {
   public Object fromBytes(byte[] bytes) {
     try {
       final Object object;
-      final IntermediateMessageType type = IntermediateMessageType.values()[bytes[0]];
+      final MessageType type = MessageType.values()[bytes[0]];
       final byte [] data = Arrays.copyOfRange(bytes, 1, bytes.length);
       switch (type) {
         case USER_MESSAGE:
           object = userMessageSerde.fromBytes(data);
           break;
-        case WATERMARK_MESSAGE:
+        case WATERMARK:
           object = watermarkSerde.fromBytes(data);
           break;
-        case END_OF_STREAM_MESSAGE:
+        case END_OF_STREAM:
           object = eosSerde.fromBytes(data);
           break;
         default:
@@ -117,15 +117,15 @@ public class IntermediateMessageSerde implements Serde<Object> {
   @Override
   public byte[] toBytes(Object object) {
     final byte [] data;
-    final IntermediateMessageType type = IntermediateMessageType.of(object);
+    final MessageType type = MessageType.of(object);
     switch (type) {
       case USER_MESSAGE:
         data = userMessageSerde.toBytes(object);
         break;
-      case WATERMARK_MESSAGE:
+      case WATERMARK:
         data = watermarkSerde.toBytes((WatermarkMessage) object);
         break;
-      case END_OF_STREAM_MESSAGE:
+      case END_OF_STREAM:
         data = eosSerde.toBytes((EndOfStreamMessage) object);
         break;
       default:

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java b/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java
index e57a89f..e2fea95 100644
--- a/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java
+++ b/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java
@@ -21,11 +21,7 @@ package org.apache.samza.task;
 
 import java.util.concurrent.ExecutorService;
 import org.apache.samza.config.Config;
-import org.apache.samza.control.ControlMessageListenerTask;
-import org.apache.samza.control.Watermark;
-import org.apache.samza.control.IOGraph;
 import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
 
 
 /**
@@ -34,7 +30,7 @@ import org.apache.samza.system.SystemStream;
  * the callbacks once it's done. If the thread pool is null, it follows the legacy
  * synchronous model to execute the tasks on the run loop thread.
  */
-public class AsyncStreamTaskAdapter implements AsyncStreamTask, InitableTask, WindowableTask, ClosableTask, EndOfStreamListenerTask, ControlMessageListenerTask {
+public class AsyncStreamTaskAdapter implements AsyncStreamTask, InitableTask, WindowableTask, ClosableTask, EndOfStreamListenerTask {
   private final StreamTask wrappedTask;
   private final ExecutorService executor;
 
@@ -100,20 +96,4 @@ public class AsyncStreamTaskAdapter implements AsyncStreamTask, InitableTask, Wi
       ((EndOfStreamListenerTask) wrappedTask).onEndOfStream(collector, coordinator);
     }
   }
-
-  @Override
-  public IOGraph getIOGraph() {
-    if (wrappedTask instanceof ControlMessageListenerTask) {
-      return ((ControlMessageListenerTask) wrappedTask).getIOGraph();
-    }
-    return null;
-  }
-
-  @Override
-  public void onWatermark(Watermark watermark, SystemStream stream, MessageCollector collector, TaskCoordinator coordinator) {
-    if (wrappedTask instanceof ControlMessageListenerTask) {
-      ((ControlMessageListenerTask) wrappedTask).onWatermark(watermark, stream, collector, coordinator);
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index 16b7e40..0074e24 100644
--- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -21,25 +21,28 @@ package org.apache.samza.task;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
-import org.apache.samza.control.ControlMessageListenerTask;
-import org.apache.samza.control.Watermark;
+import org.apache.samza.system.EndOfStreamMessage;
+import org.apache.samza.system.MessageType;
 import org.apache.samza.operators.ContextManager;
 import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.impl.InputOperatorImpl;
 import org.apache.samza.operators.impl.OperatorImplGraph;
-import org.apache.samza.control.IOGraph;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.WatermarkMessage;
 import org.apache.samza.util.Clock;
 import org.apache.samza.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * A {@link StreamTask} implementation that brings all the operator API implementation components together and
  * feeds the input messages into the user-defined transformation chains in {@link StreamApplication}.
  */
-public final class StreamOperatorTask implements StreamTask, InitableTask, WindowableTask, ClosableTask, ControlMessageListenerTask {
+public final class StreamOperatorTask implements StreamTask, InitableTask, WindowableTask, ClosableTask {
+  private static final Logger LOG = LoggerFactory.getLogger(StreamOperatorTask.class);
 
   private final StreamApplication streamApplication;
   private final ApplicationRunner runner;
@@ -47,7 +50,6 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
 
   private OperatorImplGraph operatorImplGraph;
   private ContextManager contextManager;
-  private IOGraph ioGraph;
 
   /**
    * Constructs an adaptor task to run the user-implemented {@link StreamApplication}.
@@ -91,7 +93,6 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
 
     // create the operator impl DAG corresponding to the logical operator spec DAG
     this.operatorImplGraph = new OperatorImplGraph(streamGraph, config, context, clock);
-    this.ioGraph = streamGraph.toIOGraph();
   }
 
   /**
@@ -110,7 +111,21 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
     SystemStream systemStream = ime.getSystemStreamPartition().getSystemStream();
     InputOperatorImpl inputOpImpl = operatorImplGraph.getInputOperator(systemStream);
     if (inputOpImpl != null) {
-      inputOpImpl.onMessage(Pair.of(ime.getKey(), ime.getMessage()), collector, coordinator);
+      switch (MessageType.of(ime.getMessage())) {
+        case USER_MESSAGE:
+          inputOpImpl.onMessage(Pair.of(ime.getKey(), ime.getMessage()), collector, coordinator);
+          break;
+
+        case END_OF_STREAM:
+          EndOfStreamMessage eosMessage = (EndOfStreamMessage) ime.getMessage();
+          inputOpImpl.aggregateEndOfStream(eosMessage, ime.getSystemStreamPartition(), collector, coordinator);
+          break;
+
+        case WATERMARK:
+          WatermarkMessage watermarkMessage = (WatermarkMessage) ime.getMessage();
+          inputOpImpl.aggregateWatermark(watermarkMessage, ime.getSystemStreamPartition(), collector, coordinator);
+          break;
+      }
     }
   }
 
@@ -121,22 +136,6 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
   }
 
   @Override
-  public IOGraph getIOGraph() {
-    return ioGraph;
-  }
-
-  @Override
-  public final void onWatermark(Watermark watermark,
-      SystemStream systemStream,
-      MessageCollector collector,
-      TaskCoordinator coordinator) {
-    InputOperatorImpl inputOpImpl = operatorImplGraph.getInputOperator(systemStream);
-    if (inputOpImpl != null) {
-      inputOpImpl.onWatermark(watermark, collector, coordinator);
-    }
-  }
-
-  @Override
   public void close() throws Exception {
     if (this.contextManager != null) {
       this.contextManager.close();

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index 8c739d4..1b2ce80 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -24,6 +24,7 @@ package org.apache.samza.checkpoint
 import java.util.HashMap
 import java.util.concurrent.ConcurrentHashMap
 
+import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.SamzaException
 import org.apache.samza.config.Config
 import org.apache.samza.config.StreamConfig.Config2Stream
@@ -193,7 +194,7 @@ class OffsetManager(
    */
   def update(taskName: TaskName, systemStreamPartition: SystemStreamPartition, offset: String) {
     lastProcessedOffsets.putIfAbsent(taskName, new ConcurrentHashMap[SystemStreamPartition, String]())
-    if (offset != null) {
+    if (offset != null && !offset.equals(IncomingMessageEnvelope.END_OF_STREAM_OFFSET)) {
       lastProcessedOffsets.get(taskName).put(systemStreamPartition, offset)
     }
   }
@@ -216,6 +217,10 @@ class OffsetManager(
     }
   }
 
+  def setStartingOffset(taskName: TaskName, ssp: SystemStreamPartition, offset: String): Unit = {
+    startingOffsets += taskName -> (startingOffsets(taskName) + (ssp -> offset))
+  }
+
   /**
     * Gets a snapshot of all the current offsets for the specified task. This is useful to
     * ensure there are no concurrent updates to the offsets between when this method is

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 5a19d90..acec365 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -20,25 +20,17 @@
 package org.apache.samza.container
 
 
-import com.google.common.collect.HashMultimap
-import com.google.common.collect.Multimap
 import org.apache.samza.SamzaException
 import org.apache.samza.checkpoint.OffsetManager
 import org.apache.samza.config.Config
 import org.apache.samza.config.StreamConfig.Config2Stream
-import org.apache.samza.control.ControlMessageListenerTask
-import org.apache.samza.control.ControlMessageUtils
-import org.apache.samza.control.EndOfStreamManager
-import org.apache.samza.control.WatermarkManager
 import org.apache.samza.job.model.JobModel
-import org.apache.samza.message.MessageType
 import org.apache.samza.metrics.MetricsReporter
 import org.apache.samza.storage.TaskStorageManager
 import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.system.StreamMetadataCache
 import org.apache.samza.system.SystemAdmin
 import org.apache.samza.system.SystemConsumers
-import org.apache.samza.system.SystemStream
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.task.AsyncStreamTask
 import org.apache.samza.task.ClosableTask
@@ -47,34 +39,11 @@ import org.apache.samza.task.InitableTask
 import org.apache.samza.task.ReadableCoordinator
 import org.apache.samza.task.StreamTask
 import org.apache.samza.task.TaskCallbackFactory
-import org.apache.samza.task.TaskContext
 import org.apache.samza.task.TaskInstanceCollector
 import org.apache.samza.task.WindowableTask
 import org.apache.samza.util.Logging
 
 import scala.collection.JavaConverters._
-import scala.collection.JavaConversions._
-
-object TaskInstance {
-  /**
-   * Build a map from a stream to its consumer tasks
-   * @param jobModel job model which contains ssp-to-task assignment
-   * @return the map of input stream to tasks
-   */
-  def buildInputToTasks(jobModel: JobModel): Multimap[SystemStream, String] = {
-    val streamToTasks: Multimap[SystemStream, String] = HashMultimap.create[SystemStream, String]
-    if (jobModel != null) {
-      for (containerModel <- jobModel.getContainers.values) {
-        for (taskModel <- containerModel.getTasks.values) {
-          for (ssp <- taskModel.getSystemStreamPartitions) {
-            streamToTasks.put(ssp.getSystemStream, taskModel.getTaskName.toString)
-          }
-        }
-      }
-    }
-    return streamToTasks
-  }
-}
 
 class TaskInstance(
   val task: Any,
@@ -97,35 +66,10 @@ class TaskInstance(
   val isEndOfStreamListenerTask = task.isInstanceOf[EndOfStreamListenerTask]
   val isClosableTask = task.isInstanceOf[ClosableTask]
   val isAsyncTask = task.isInstanceOf[AsyncStreamTask]
-  val isControlMessageListener = task.isInstanceOf[ControlMessageListenerTask]
-
-  val context = new TaskContext {
-    var userContext: Object = null;
-    def getMetricsRegistry = metrics.registry
-    def getSystemStreamPartitions = systemStreamPartitions.asJava
-    def getStore(storeName: String) = if (storageManager != null) {
-      storageManager(storeName)
-    } else {
-      warn("No store found for name: %s" format storeName)
 
-      null
-    }
-    def getTaskName = taskName
-    def getSamzaContainerContext = containerContext
+  val context = new TaskContextImpl(taskName,metrics, containerContext, systemStreamPartitions.asJava, offsetManager,
+                                    storageManager, jobModel, streamMetadataCache)
 
-    override def setStartingOffset(ssp: SystemStreamPartition, offset: String): Unit = {
-      val startingOffsets = offsetManager.startingOffsets
-      offsetManager.startingOffsets += taskName -> (startingOffsets(taskName) + (ssp -> offset))
-    }
-
-    override def setUserContext(context: Object): Unit = {
-      userContext = context
-    }
-
-    override def getUserContext: Object = {
-      userContext
-    }
-  }
   // store the (ssp -> if this ssp is catched up) mapping. "catched up"
   // means the same ssp in other taskInstances have the same offset as
   // the one here.
@@ -133,10 +77,6 @@ class TaskInstance(
     scala.collection.mutable.Map[SystemStreamPartition, Boolean]()
   systemStreamPartitions.foreach(ssp2CaughtupMapping += _ -> false)
 
-  val inputToTasksMapping = TaskInstance.buildInputToTasks(jobModel)
-  var endOfStreamManager: EndOfStreamManager = null
-  var watermarkManager: WatermarkManager = null
-
   val hasIntermediateStreams = config.getStreamIds.exists(config.getIsIntermediate(_))
 
   def registerMetrics {
@@ -169,22 +109,6 @@ class TaskInstance(
     } else {
       debug("Skipping task initialization for taskName: %s" format taskName)
     }
-
-    if (isControlMessageListener) {
-      endOfStreamManager = new EndOfStreamManager(taskName.getTaskName,
-                                                  task.asInstanceOf[ControlMessageListenerTask],
-                                                  inputToTasksMapping,
-                                                  systemStreamPartitions.asJava,
-                                                  streamMetadataCache,
-                                                  collector)
-
-      watermarkManager = new WatermarkManager(taskName.getTaskName,
-                                                  task.asInstanceOf[ControlMessageListenerTask],
-                                                  inputToTasksMapping,
-                                                  systemStreamPartitions.asJava,
-                                                  streamMetadataCache,
-                                                  collector)
-    }
   }
 
   def registerProducers {
@@ -223,51 +147,20 @@ class TaskInstance(
       trace("Processing incoming message envelope for taskName and SSP: %s, %s"
         format (taskName, envelope.getSystemStreamPartition))
 
-      MessageType.of(envelope.getMessage) match {
-        case MessageType.USER_MESSAGE =>
-          if (isAsyncTask) {
-            exceptionHandler.maybeHandle {
-             val callback = callbackFactory.createCallback()
-             task.asInstanceOf[AsyncStreamTask].processAsync(envelope, collector, coordinator, callback)
-            }
-          }
-          else {
-            exceptionHandler.maybeHandle {
-             task.asInstanceOf[StreamTask].process(envelope, collector, coordinator)
-            }
-
-            trace("Updating offset map for taskName, SSP and offset: %s, %s, %s"
-              format(taskName, envelope.getSystemStreamPartition, envelope.getOffset))
-
-            offsetManager.update(taskName, envelope.getSystemStreamPartition, envelope.getOffset)
-          }
+      if (isAsyncTask) {
+        exceptionHandler.maybeHandle {
+          val callback = callbackFactory.createCallback()
+          task.asInstanceOf[AsyncStreamTask].processAsync(envelope, collector, coordinator, callback)
+        }
+      } else {
+        exceptionHandler.maybeHandle {
+         task.asInstanceOf[StreamTask].process(envelope, collector, coordinator)
+        }
 
-        case MessageType.END_OF_STREAM =>
-          if (isControlMessageListener) {
-            // handle eos synchronously.
-            runSync(callbackFactory) {
-              endOfStreamManager.update(envelope, coordinator)
-            }
-          } else {
-            warn("Ignore end-of-stream message due to %s not implementing ControlMessageListener."
-              format(task.getClass.toString))
-          }
+        trace("Updating offset map for taskName, SSP and offset: %s, %s, %s"
+          format (taskName, envelope.getSystemStreamPartition, envelope.getOffset))
 
-        case MessageType.WATERMARK =>
-          if (isControlMessageListener) {
-            // handle watermark synchronously in the run loop thread.
-            // we might consider running it asynchronously later
-            runSync(callbackFactory) {
-              val watermark = watermarkManager.update(envelope)
-              if (watermark != null) {
-                val stream = envelope.getSystemStreamPartition.getSystemStream
-                task.asInstanceOf[ControlMessageListenerTask].onWatermark(watermark, stream, collector, coordinator)
-              }
-            }
-          } else {
-            warn("Ignore watermark message due to %s not implementing ControlMessageListener."
-              format(task.getClass.toString))
-          }
+        offsetManager.update(taskName, envelope.getSystemStreamPartition, envelope.getOffset)
       }
     }
   }
@@ -343,38 +236,32 @@ class TaskInstance(
    * it's already catched-up.
    */
   private def checkCaughtUp(envelope: IncomingMessageEnvelope) = {
-    systemAdmins match {
-      case null => {
-        warn("systemAdmin is null. Set all SystemStreamPartitions to catched-up")
-        ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true
-      }
-      case others => {
-        val startingOffset = offsetManager.getStartingOffset(taskName, envelope.getSystemStreamPartition)
-            .getOrElse(throw new SamzaException("No offset defined for SystemStreamPartition: %s" format envelope.getSystemStreamPartition))
-        val system = envelope.getSystemStreamPartition.getSystem
-        others(system).offsetComparator(envelope.getOffset, startingOffset) match {
-          case null => {
-            info("offsets in " + system + " is not comparable. Set all SystemStreamPartitions to catched-up")
-            ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true // not comparable
-          }
-          case result => {
-            if (result >= 0) {
-              info(envelope.getSystemStreamPartition.toString + " is catched up.")
-              ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true
+    if (IncomingMessageEnvelope.END_OF_STREAM_OFFSET.equals(envelope.getOffset)) {
+      ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true
+    } else {
+      systemAdmins match {
+        case null => {
+          warn("systemAdmin is null. Set all SystemStreamPartitions to catched-up")
+          ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true
+        }
+        case others => {
+          val startingOffset = offsetManager.getStartingOffset(taskName, envelope.getSystemStreamPartition)
+              .getOrElse(throw new SamzaException("No offset defined for SystemStreamPartition: %s" format envelope.getSystemStreamPartition))
+          val system = envelope.getSystemStreamPartition.getSystem
+          others(system).offsetComparator(envelope.getOffset, startingOffset) match {
+            case null => {
+              info("offsets in " + system + " is not comparable. Set all SystemStreamPartitions to catched-up")
+              ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true // not comparable
+            }
+            case result => {
+              if (result >= 0) {
+                info(envelope.getSystemStreamPartition.toString + " is catched up.")
+                ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true
+              }
             }
           }
         }
       }
     }
   }
-
-  private def runSync(callbackFactory: TaskCallbackFactory)(runCodeBlock: => Unit) = {
-    val callback = callbackFactory.createCallback()
-    try {
-      runCodeBlock
-      callback.complete()
-    } catch {
-      case t: Throwable => callback.failure(t)
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala b/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
index 0100c78..76594ae 100644
--- a/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
@@ -20,12 +20,13 @@
 package org.apache.samza.serializers
 
 import org.apache.samza.SamzaException
-import org.apache.samza.message.ControlMessage
-import org.apache.samza.message.WatermarkMessage
+import org.apache.samza.system.ControlMessage
 import org.apache.samza.system.SystemStream
 import org.apache.samza.system.OutgoingMessageEnvelope
 import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.config.StorageConfig
+import org.apache.samza.system.WatermarkMessage
+
 
 class SerdeManager(
   serdes: Map[String, Serde[Object]] = Map(),

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/control/TestControlMessageUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/control/TestControlMessageUtils.java b/samza-core/src/test/java/org/apache/samza/control/TestControlMessageUtils.java
deleted file mode 100644
index 8351802..0000000
--- a/samza-core/src/test/java/org/apache/samza/control/TestControlMessageUtils.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.control;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.samza.Partition;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.message.ControlMessage;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.StreamSpec;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamMetadata;
-import org.apache.samza.task.MessageCollector;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestControlMessageUtils {
-
-  @Test
-  public void testSendControlMessage() {
-    SystemStreamMetadata metadata = mock(SystemStreamMetadata.class);
-    Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = new HashMap<>();
-    partitionMetadata.put(new Partition(0), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
-    partitionMetadata.put(new Partition(1), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
-    partitionMetadata.put(new Partition(2), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
-    partitionMetadata.put(new Partition(3), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
-    when(metadata.getSystemStreamPartitionMetadata()).thenReturn(partitionMetadata);
-    StreamMetadataCache metadataCache = mock(StreamMetadataCache.class);
-    when(metadataCache.getSystemStreamMetadata(anyObject(), anyBoolean())).thenReturn(metadata);
-
-    SystemStream systemStream = new SystemStream("test-system", "test-stream");
-    Set<Integer> partitions = new HashSet<>();
-    MessageCollector collector = mock(MessageCollector.class);
-    doAnswer(invocation -> {
-        OutgoingMessageEnvelope envelope = (OutgoingMessageEnvelope) invocation.getArguments()[0];
-        partitions.add((Integer) envelope.getPartitionKey());
-        assertEquals(envelope.getSystemStream(), systemStream);
-        return null;
-      }).when(collector).send(any());
-
-    ControlMessageUtils.sendControlMessage(mock(ControlMessage.class), systemStream, metadataCache, collector);
-    assertEquals(partitions.size(), 4);
-  }
-
-  @Test
-  public void testCalculateUpstreamTaskCounts() {
-    SystemStream input1 = new SystemStream("test-system", "input-stream-1");
-    SystemStream input2 = new SystemStream("test-system", "input-stream-2");
-    SystemStream input3 = new SystemStream("test-system", "input-stream-3");
-
-    Multimap<SystemStream, String> inputToTasks = HashMultimap.create();
-    TaskName t0 = new TaskName("task 0"); //consume input1 and input2
-    TaskName t1 = new TaskName("task 1"); //consume input1 and input2 and input 3
-    TaskName t2 = new TaskName("task 2"); //consume input2 and input 3
-    inputToTasks.put(input1, t0.getTaskName());
-    inputToTasks.put(input1, t1.getTaskName());
-    inputToTasks.put(input2, t0.getTaskName());
-    inputToTasks.put(input2, t1.getTaskName());
-    inputToTasks.put(input2, t2.getTaskName());
-    inputToTasks.put(input3, t1.getTaskName());
-    inputToTasks.put(input3, t2.getTaskName());
-
-    StreamSpec inputSpec2 = new StreamSpec("input-stream-2", "input-stream-2", "test-system");
-    StreamSpec inputSpec3 = new StreamSpec("input-stream-3", "input-stream-3", "test-system");
-    StreamSpec intSpec1 = new StreamSpec("int-stream-1", "int-stream-1", "test-system");
-    StreamSpec intSpec2 = new StreamSpec("int-stream-2", "int-stream-2", "test-system");
-
-    List<IOGraph.IONode> nodes = new ArrayList<>();
-    IOGraph.IONode node = new IOGraph.IONode(intSpec1, true);
-    node.addInput(inputSpec2);
-    nodes.add(node);
-    node = new IOGraph.IONode(intSpec2, true);
-    node.addInput(inputSpec3);
-    nodes.add(node);
-    IOGraph ioGraph = new IOGraph(nodes);
-
-    Map<SystemStream, Integer> counts = ControlMessageUtils.calculateUpstreamTaskCounts(inputToTasks, ioGraph);
-    assertEquals(counts.get(intSpec1.toSystemStream()).intValue(), 3);
-    assertEquals(counts.get(intSpec2.toSystemStream()).intValue(), 2);
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/control/TestEndOfStreamManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/control/TestEndOfStreamManager.java b/samza-core/src/test/java/org/apache/samza/control/TestEndOfStreamManager.java
deleted file mode 100644
index cc70b6b..0000000
--- a/samza-core/src/test/java/org/apache/samza/control/TestEndOfStreamManager.java
+++ /dev/null
@@ -1,333 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.control;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.samza.Partition;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.message.EndOfStreamMessage;
-import org.apache.samza.operators.spec.OperatorSpecs;
-import org.apache.samza.operators.spec.OutputOperatorSpec;
-import org.apache.samza.operators.spec.OutputStreamImpl;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.StreamSpec;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamMetadata;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-
-public class TestEndOfStreamManager {
-  StreamMetadataCache metadataCache;
-
-  @Before
-  public void setup() {
-    SystemStreamMetadata metadata = mock(SystemStreamMetadata.class);
-    Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = new HashMap<>();
-    partitionMetadata.put(new Partition(0), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
-    partitionMetadata.put(new Partition(1), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
-    partitionMetadata.put(new Partition(2), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
-    partitionMetadata.put(new Partition(3), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
-    when(metadata.getSystemStreamPartitionMetadata()).thenReturn(partitionMetadata);
-    metadataCache = mock(StreamMetadataCache.class);
-    when(metadataCache.getSystemStreamMetadata(anyObject(), anyBoolean())).thenReturn(metadata);
-  }
-
-  @Test
-  public void testUpdateFromInputSource() {
-    SystemStreamPartition ssp = new SystemStreamPartition("test-system", "test-stream", new Partition(0));
-    TaskName taskName = new TaskName("Task 0");
-    Multimap<SystemStream, String> streamToTasks = HashMultimap.create();
-    streamToTasks.put(ssp.getSystemStream(), taskName.getTaskName());
-    ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class);
-    when(listener.getIOGraph()).thenReturn(new IOGraph(Collections.emptyList()));
-    EndOfStreamManager manager = new EndOfStreamManager("Task 0", listener, streamToTasks, Collections.singleton(ssp), null, null);
-    manager.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssp), mock(TaskCoordinator.class));
-    assertTrue(manager.isEndOfStream(ssp.getSystemStream()));
-  }
-
-  @Test
-  public void testUpdateFromIntermediateStream() {
-    SystemStreamPartition[] ssps = new SystemStreamPartition[3];
-    ssps[0] = new SystemStreamPartition("test-system", "test-stream-1", new Partition(0));
-    ssps[1] = new SystemStreamPartition("test-system", "test-stream-2", new Partition(0));
-    ssps[2] = new SystemStreamPartition("test-system", "test-stream-2", new Partition(1));
-
-    TaskName taskName = new TaskName("Task 0");
-    Multimap<SystemStream, String> streamToTasks = HashMultimap.create();
-    for (SystemStreamPartition ssp : ssps) {
-      streamToTasks.put(ssp.getSystemStream(), taskName.getTaskName());
-    }
-    ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class);
-    when(listener.getIOGraph()).thenReturn(new IOGraph(Collections.emptyList()));
-    EndOfStreamManager manager = new EndOfStreamManager("Task 0", listener, streamToTasks, new HashSet<>(Arrays.asList(ssps)), null, null);
-
-    int envelopeCount = 4;
-    IncomingMessageEnvelope[] envelopes = new IncomingMessageEnvelope[envelopeCount];
-    for (int i = 0; i < envelopeCount; i++) {
-      envelopes[i] = new IncomingMessageEnvelope(ssps[0], "dummy-offset", "", new EndOfStreamMessage("task " + i, envelopeCount));
-    }
-    TaskCoordinator coordinator = mock(TaskCoordinator.class);
-
-    // verify the first three messages won't result in end-of-stream
-    for (int i = 0; i < 3; i++) {
-      manager.update(envelopes[i], coordinator);
-      assertFalse(manager.isEndOfStream(ssps[0].getSystemStream()));
-    }
-    // the fourth message will end the stream
-    manager.update(envelopes[3], coordinator);
-    assertTrue(manager.isEndOfStream(ssps[0].getSystemStream()));
-    assertFalse(manager.isEndOfStream(ssps[1].getSystemStream()));
-
-    // stream2 has two partitions assigned to this task, so it requires a message from each partition to end it
-    envelopes = new IncomingMessageEnvelope[envelopeCount];
-    for (int i = 0; i < envelopeCount; i++) {
-      envelopes[i] = new IncomingMessageEnvelope(ssps[1], "dummy-offset", "dummy-key", new EndOfStreamMessage("task " + i, envelopeCount));
-    }
-    // verify the messages for the partition 0 won't result in end-of-stream
-    for (int i = 0; i < 4; i++) {
-      manager.update(envelopes[i], coordinator);
-      assertFalse(manager.isEndOfStream(ssps[1].getSystemStream()));
-    }
-    for (int i = 0; i < envelopeCount; i++) {
-      envelopes[i] = new IncomingMessageEnvelope(ssps[2], "dummy-offset", "dummy-key", new EndOfStreamMessage("task " + i, envelopeCount));
-    }
-    for (int i = 0; i < 3; i++) {
-      manager.update(envelopes[i], coordinator);
-      assertFalse(manager.isEndOfStream(ssps[1].getSystemStream()));
-    }
-    // the fourth message will end the stream
-    manager.update(envelopes[3], coordinator);
-    assertTrue(manager.isEndOfStream(ssps[1].getSystemStream()));
-  }
-
-  @Test
-  public void testUpdateFromIntermediateStreamWith2Tasks() {
-    SystemStreamPartition[] ssps0 = new SystemStreamPartition[2];
-    ssps0[0] = new SystemStreamPartition("test-system", "test-stream-1", new Partition(0));
-    ssps0[1] = new SystemStreamPartition("test-system", "test-stream-2", new Partition(0));
-
-    SystemStreamPartition ssp1 = new SystemStreamPartition("test-system", "test-stream-2", new Partition(1));
-
-    TaskName t0 = new TaskName("Task 0");
-    Multimap<SystemStream, String> streamToTasks = HashMultimap.create();
-    for (SystemStreamPartition ssp : ssps0) {
-      streamToTasks.put(ssp.getSystemStream(), t0.getTaskName());
-    }
-
-    TaskName t1 = new TaskName("Task 1");
-    streamToTasks.put(ssp1, t1.getTaskName());
-
-    List<StreamSpec> inputs = new ArrayList<>();
-    inputs.add(new StreamSpec("test-stream-1", "test-stream-1", "test-system"));
-    inputs.add(new StreamSpec("test-stream-2", "test-stream-2", "test-system"));
-    StreamSpec outputSpec = new StreamSpec("int-stream", "int-stream", "test-system");
-    IOGraph ioGraph = TestIOGraph.buildSimpleIOGraph(inputs, outputSpec, true);
-
-    ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class);
-    when(listener.getIOGraph()).thenReturn(ioGraph);
-
-    EndOfStreamManager manager0 = spy(new EndOfStreamManager("Task 0", listener, streamToTasks, new HashSet<>(Arrays.asList(ssps0)), null, null));
-    manager0.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssps0[0]), mock(TaskCoordinator.class));
-    assertTrue(manager0.isEndOfStream(ssps0[0].getSystemStream()));
-    doNothing().when(manager0).sendEndOfStream(any(), anyInt());
-    manager0.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssps0[1]), mock(TaskCoordinator.class));
-    assertTrue(manager0.isEndOfStream(ssps0[1].getSystemStream()));
-    verify(manager0).sendEndOfStream(any(), anyInt());
-
-    EndOfStreamManager manager1 = spy(new EndOfStreamManager("Task 1", listener, streamToTasks, Collections.singleton(
-        ssp1), null, null));
-    doNothing().when(manager1).sendEndOfStream(any(), anyInt());
-    manager1.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssp1), mock(TaskCoordinator.class));
-    assertTrue(manager1.isEndOfStream(ssp1.getSystemStream()));
-    verify(manager1).sendEndOfStream(any(), anyInt());
-  }
-
-  @Test
-  public void testSendEndOfStream() {
-    StreamSpec ints = new StreamSpec("int-stream", "int-stream", "test-system");
-    StreamSpec input = new StreamSpec("input-stream", "input-stream", "test-system");
-    IOGraph ioGraph = TestIOGraph.buildSimpleIOGraph(Collections.singletonList(input), ints, true);
-
-    Multimap<SystemStream, String> inputToTasks = HashMultimap.create();
-    for (int i = 0; i < 8; i++) {
-      inputToTasks.put(input.toSystemStream(), "Task " + i);
-    }
-
-    MessageCollector collector = mock(MessageCollector.class);
-    TaskName taskName = new TaskName("Task 0");
-    ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class);
-    when(listener.getIOGraph()).thenReturn(ioGraph);
-    EndOfStreamManager manager = new EndOfStreamManager(taskName.getTaskName(),
-        listener,
-        inputToTasks,
-        Collections.EMPTY_SET,
-        metadataCache,
-        collector);
-
-    Set<Integer> partitions = new HashSet<>();
-    doAnswer(invocation -> {
-        OutgoingMessageEnvelope envelope = (OutgoingMessageEnvelope) invocation.getArguments()[0];
-        partitions.add((Integer) envelope.getPartitionKey());
-        EndOfStreamMessage eosMessage = (EndOfStreamMessage) envelope.getMessage();
-        assertEquals(eosMessage.getTaskName(), taskName.getTaskName());
-        assertEquals(eosMessage.getTaskCount(), 8);
-        return null;
-      }).when(collector).send(any());
-
-    manager.sendEndOfStream(input.toSystemStream(), 8);
-    assertEquals(partitions.size(), 4);
-  }
-
-  @Test
-  public void testPropagate() {
-    List<StreamSpec> inputs = new ArrayList<>();
-    inputs.add(new StreamSpec("input-stream-1", "input-stream-1", "test-system"));
-    inputs.add(new StreamSpec("input-stream-2", "input-stream-2", "test-system"));
-    StreamSpec outputSpec = new StreamSpec("int-stream", "int-stream", "test-system");
-
-    SystemStream input1 = new SystemStream("test-system", "input-stream-1");
-    SystemStream input2 = new SystemStream("test-system", "input-stream-2");
-    SystemStream ints = new SystemStream("test-system", "int-stream");
-    SystemStreamPartition[] ssps = new SystemStreamPartition[3];
-    ssps[0] = new SystemStreamPartition(input1, new Partition(0));
-    ssps[1] = new SystemStreamPartition(input2, new Partition(0));
-    ssps[2] = new SystemStreamPartition(ints, new Partition(0));
-
-    Set<SystemStreamPartition> sspSet = new HashSet<>(Arrays.asList(ssps));
-    TaskName taskName = new TaskName("task 0");
-    Multimap<SystemStream, String> streamToTasks = HashMultimap.create();
-    for (SystemStreamPartition ssp : ssps) {
-      streamToTasks.put(ssp.getSystemStream(), taskName.getTaskName());
-    }
-
-    IOGraph ioGraph = TestIOGraph.buildSimpleIOGraph(inputs, outputSpec, true);
-    MessageCollector collector = mock(MessageCollector.class);
-    ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class);
-    when(listener.getIOGraph()).thenReturn(ioGraph);
-    EndOfStreamManager manager = spy(
-        new EndOfStreamManager("task 0", listener, streamToTasks, sspSet, metadataCache, collector));
-    TaskCoordinator coordinator = mock(TaskCoordinator.class);
-
-    // ssp1 end-of-stream, wait for ssp2
-    manager.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssps[0]), coordinator);
-    verify(manager, never()).sendEndOfStream(any(), anyInt());
-
-    // ssp2 end-of-stream, propagate to intermediate
-    manager.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssps[1]), coordinator);
-    doNothing().when(manager).sendEndOfStream(any(), anyInt());
-    ArgumentCaptor<SystemStream> argument = ArgumentCaptor.forClass(SystemStream.class);
-    verify(manager).sendEndOfStream(argument.capture(), anyInt());
-    assertEquals(ints, argument.getValue());
-
-    // intermediate end-of-stream, shutdown the task
-    manager.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssps[2]), coordinator);
-    doNothing().when(coordinator).shutdown(any());
-    ArgumentCaptor<TaskCoordinator.RequestScope> arg = ArgumentCaptor.forClass(TaskCoordinator.RequestScope.class);
-    verify(coordinator).shutdown(arg.capture());
-    assertEquals(TaskCoordinator.RequestScope.CURRENT_TASK, arg.getValue());
-  }
-
-  //  Test the case when the publishing tasks to intermediate stream is a subset of total tasks
-  @Test
-  public void testPropogateWith2Tasks() {
-    StreamSpec outputSpec = new StreamSpec("int-stream", "int-stream", "test-system");
-    OutputStreamImpl outputStream = new OutputStreamImpl(outputSpec, null, null);
-    OutputOperatorSpec partitionByOp = OperatorSpecs.createPartitionByOperatorSpec(outputStream, 0);
-
-    List<StreamSpec> inputs = new ArrayList<>();
-    inputs.add(new StreamSpec("input-stream-1", "input-stream-1", "test-system"));
-
-    IOGraph ioGraph = TestIOGraph.buildSimpleIOGraph(inputs, outputSpec, true);
-
-    SystemStream input1 = new SystemStream("test-system", "input-stream-1");
-    SystemStream ints = new SystemStream("test-system", "int-stream");
-    SystemStreamPartition ssp1 = new SystemStreamPartition(input1, new Partition(0));
-    SystemStreamPartition ssp2 = new SystemStreamPartition(ints, new Partition(0));
-
-    TaskName t0 = new TaskName("task 0");
-    TaskName t1 = new TaskName("task 1");
-    Multimap<SystemStream, String> streamToTasks = HashMultimap.create();
-    streamToTasks.put(ssp1.getSystemStream(), t0.getTaskName());
-    streamToTasks.put(ssp2.getSystemStream(), t1.getTaskName());
-
-    ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class);
-    when(listener.getIOGraph()).thenReturn(ioGraph);
-
-    EndOfStreamManager manager0 = spy(
-        new EndOfStreamManager(t0.getTaskName(), listener, streamToTasks, Collections.singleton(ssp1), metadataCache, null));
-    EndOfStreamManager manager1 = spy(
-        new EndOfStreamManager(t1.getTaskName(), listener, streamToTasks, Collections.singleton(ssp2), metadataCache, null));
-
-    TaskCoordinator coordinator0 = mock(TaskCoordinator.class);
-    TaskCoordinator coordinator1 = mock(TaskCoordinator.class);
-
-    // ssp1 end-of-stream
-    doNothing().when(manager0).sendEndOfStream(any(), anyInt());
-    doNothing().when(coordinator0).shutdown(any());
-    manager0.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssp1), coordinator0);
-    //verify task count is 1
-    ArgumentCaptor<Integer> argument = ArgumentCaptor.forClass(Integer.class);
-    verify(manager0).sendEndOfStream(any(), argument.capture());
-    assertTrue(argument.getValue() == 1);
-    ArgumentCaptor<TaskCoordinator.RequestScope> arg = ArgumentCaptor.forClass(TaskCoordinator.RequestScope.class);
-    verify(coordinator0).shutdown(arg.capture());
-    assertEquals(TaskCoordinator.RequestScope.CURRENT_TASK, arg.getValue());
-
-    // int1 end-of-stream
-    IncomingMessageEnvelope intEos = new IncomingMessageEnvelope(ssp2, null, null, new EndOfStreamMessage(t0.getTaskName(), 1));
-    manager1.update(intEos, coordinator1);
-    doNothing().when(coordinator1).shutdown(any());
-    verify(manager1, never()).sendEndOfStream(any(), anyInt());
-    arg = ArgumentCaptor.forClass(TaskCoordinator.RequestScope.class);
-    verify(coordinator1).shutdown(arg.capture());
-    assertEquals(TaskCoordinator.RequestScope.CURRENT_TASK, arg.getValue());
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/control/TestIOGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/control/TestIOGraph.java b/samza-core/src/test/java/org/apache/samza/control/TestIOGraph.java
deleted file mode 100644
index 39c56c3..0000000
--- a/samza-core/src/test/java/org/apache/samza/control/TestIOGraph.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.control;
-
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.control.IOGraph.IONode;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.system.StreamSpec;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestIOGraph {
-  StreamSpec input1;
-  StreamSpec input2;
-  StreamSpec input3;
-  StreamSpec output1;
-  StreamSpec output2;
-  StreamSpec int1;
-  StreamSpec int2;
-
-  StreamGraphImpl streamGraph;
-
-  @Before
-  public void setup() {
-    ApplicationRunner runner = mock(ApplicationRunner.class);
-    Map<String, String> configMap = new HashMap<>();
-    configMap.put(JobConfig.JOB_NAME(), "test-app");
-    configMap.put(JobConfig.JOB_DEFAULT_SYSTEM(), "test-system");
-    Config config = new MapConfig(configMap);
-
-    /**
-     * the graph looks like the following. number of partitions in parentheses. quotes indicate expected value.
-     *
-     *                                    input1 -> map -> join -> output1
-     *                                                       |
-     *                      input2 -> partitionBy -> filter -|
-     *                                                       |
-     *           input3 -> filter -> partitionBy -> map -> join -> output2
-     *
-     */
-    input1 = new StreamSpec("input1", "input1", "system1");
-    input2 = new StreamSpec("input2", "input2", "system2");
-    input3 = new StreamSpec("input3", "input3", "system2");
-
-    output1 = new StreamSpec("output1", "output1", "system1");
-    output2 = new StreamSpec("output2", "output2", "system2");
-
-    runner = mock(ApplicationRunner.class);
-    when(runner.getStreamSpec("input1")).thenReturn(input1);
-    when(runner.getStreamSpec("input2")).thenReturn(input2);
-    when(runner.getStreamSpec("input3")).thenReturn(input3);
-    when(runner.getStreamSpec("output1")).thenReturn(output1);
-    when(runner.getStreamSpec("output2")).thenReturn(output2);
-
-    // intermediate streams used in tests
-    int1 = new StreamSpec("test-app-1-partition_by-3", "test-app-1-partition_by-3", "default-system");
-    int2 = new StreamSpec("test-app-1-partition_by-8", "test-app-1-partition_by-8", "default-system");
-    when(runner.getStreamSpec("test-app-1-partition_by-3"))
-        .thenReturn(int1);
-    when(runner.getStreamSpec("test-app-1-partition_by-8"))
-        .thenReturn(int2);
-
-    streamGraph = new StreamGraphImpl(runner, config);
-    BiFunction msgBuilder = mock(BiFunction.class);
-    MessageStream m1 = streamGraph.getInputStream("input1", msgBuilder).map(m -> m);
-    MessageStream m2 = streamGraph.getInputStream("input2", msgBuilder).partitionBy(m -> "haha").filter(m -> true);
-    MessageStream m3 = streamGraph.getInputStream("input3", msgBuilder).filter(m -> true).partitionBy(m -> "hehe").map(m -> m);
-    Function mockFn = mock(Function.class);
-    OutputStream<Object, Object, Object> om1 = streamGraph.getOutputStream("output1", mockFn, mockFn);
-    OutputStream<Object, Object, Object> om2 = streamGraph.getOutputStream("output2", mockFn, mockFn);
-
-    m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).sendTo(om1);
-    m3.join(m2, mock(JoinFunction.class), Duration.ofHours(1)).sendTo(om2);
-  }
-
-  @Test
-  public void testBuildIOGraph() {
-    IOGraph ioGraph = streamGraph.toIOGraph();
-    assertEquals(ioGraph.getNodes().size(), 4);
-
-    for (IONode node : ioGraph.getNodes()) {
-      if (node.getOutput().equals(output1)) {
-        assertEquals(node.getInputs().size(), 2);
-        assertFalse(node.isOutputIntermediate());
-        StreamSpec[] inputs = sort(node.getInputs());
-        assertEquals(inputs[0], input1);
-        assertEquals(inputs[1], int1);
-      } else if (node.getOutput().equals(output2)) {
-        assertEquals(node.getInputs().size(), 2);
-        assertFalse(node.isOutputIntermediate());
-        StreamSpec[] inputs = sort(node.getInputs());
-        assertEquals(inputs[0], int1);
-        assertEquals(inputs[1], int2);
-      } else if (node.getOutput().equals(int1)) {
-        assertEquals(node.getInputs().size(), 1);
-        assertTrue(node.isOutputIntermediate());
-        StreamSpec[] inputs = sort(node.getInputs());
-        assertEquals(inputs[0], input2);
-      } else if (node.getOutput().equals(int2)) {
-        assertEquals(node.getInputs().size(), 1);
-        assertTrue(node.isOutputIntermediate());
-        StreamSpec[] inputs = sort(node.getInputs());
-        assertEquals(inputs[0], input3);
-      }
-    }
-  }
-
-  @Test
-  public void testNodesOfInput() {
-    IOGraph ioGraph = streamGraph.toIOGraph();
-    Collection<IONode> nodes = ioGraph.getNodesOfInput(input1.toSystemStream());
-    assertEquals(nodes.size(), 1);
-    IONode node = nodes.iterator().next();
-    assertEquals(node.getOutput(), output1);
-    assertEquals(node.getInputs().size(), 2);
-    assertFalse(node.isOutputIntermediate());
-
-    nodes = ioGraph.getNodesOfInput(input2.toSystemStream());
-    assertEquals(nodes.size(), 1);
-    node = nodes.iterator().next();
-    assertEquals(node.getOutput(), int1);
-    assertEquals(node.getInputs().size(), 1);
-    assertTrue(node.isOutputIntermediate());
-
-    nodes = ioGraph.getNodesOfInput(int1.toSystemStream());
-    assertEquals(nodes.size(), 2);
-    nodes.forEach(n -> {
-        assertEquals(n.getInputs().size(), 2);
-      });
-
-    nodes = ioGraph.getNodesOfInput(input3.toSystemStream());
-    assertEquals(nodes.size(), 1);
-    node = nodes.iterator().next();
-    assertEquals(node.getOutput(), int2);
-    assertEquals(node.getInputs().size(), 1);
-    assertTrue(node.isOutputIntermediate());
-
-    nodes = ioGraph.getNodesOfInput(int2.toSystemStream());
-    assertEquals(nodes.size(), 1);
-    node = nodes.iterator().next();
-    assertEquals(node.getOutput(), output2);
-    assertEquals(node.getInputs().size(), 2);
-    assertFalse(node.isOutputIntermediate());
-  }
-
-  private static StreamSpec[] sort(Set<StreamSpec> specs) {
-    StreamSpec[] array = new StreamSpec[specs.size()];
-    specs.toArray(array);
-    Arrays.sort(array, (s1, s2) -> s1.getId().compareTo(s2.getId()));
-    return array;
-  }
-
-  public static IOGraph buildSimpleIOGraph(List<StreamSpec> inputs,
-      StreamSpec output,
-      boolean isOutputIntermediate) {
-    IONode node = new IONode(output, isOutputIntermediate);
-    inputs.forEach(input -> node.addInput(input));
-    return new IOGraph(Collections.singleton(node));
-  }
-}