You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/23 04:53:02 UTC

[1/3] incubator-apex-core git commit: APEXCORE-60 Iteration support in Apex Core

Repository: incubator-apex-core
Updated Branches:
  refs/heads/devel-3 d0908e4bc -> b3402be5a


APEXCORE-60 Iteration support in Apex Core


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/f7e1ccf1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/f7e1ccf1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/f7e1ccf1

Branch: refs/heads/devel-3
Commit: f7e1ccf14154eca92b24c5f6b5387fe56c516829
Parents: d0908e4
Author: David Yan <da...@datatorrent.com>
Authored: Wed Dec 9 15:52:26 2015 -0800
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Jan 22 19:03:46 2016 -0800

----------------------------------------------------------------------
 .../main/java/com/datatorrent/api/Context.java  |   7 +
 .../main/java/com/datatorrent/api/Operator.java |  19 +
 .../common/util/DefaultDelayOperator.java       |  75 ++++
 .../datatorrent/stram/StramLocalCluster.java    |  15 +
 .../stram/StreamingContainerManager.java        |  56 ++-
 .../datatorrent/stram/engine/GenericNode.java   | 190 +++++++---
 .../java/com/datatorrent/stram/engine/Node.java |   6 +-
 .../stram/engine/StreamingContainer.java        |   2 +
 .../stram/engine/WindowGenerator.java           |  14 +-
 .../stram/plan/logical/LogicalPlan.java         |  53 +++
 .../stram/plan/physical/PTOperator.java         |   4 +-
 .../stram/plan/physical/PhysicalPlan.java       |  19 +-
 .../stram/plan/physical/StreamMapping.java      |   4 +-
 .../java/com/datatorrent/stram/tuple/Tuple.java |   5 +
 .../stram/debug/TupleRecorderTest.java          | 208 +++++-----
 .../stram/engine/GenericNodeTest.java           |  18 +-
 .../stram/engine/GenericTestOperator.java       |   3 +
 .../stram/plan/logical/DelayOperatorTest.java   | 377 +++++++++++++++++++
 18 files changed, 888 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/api/src/main/java/com/datatorrent/api/Context.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java
index ceed8a2..58bc552 100644
--- a/api/src/main/java/com/datatorrent/api/Context.java
+++ b/api/src/main/java/com/datatorrent/api/Context.java
@@ -166,6 +166,13 @@ public interface Context
      */
     Attribute<Class<?>> TUPLE_CLASS = new Attribute<>(new Class2String<>());
 
+    /**
+     * Attribute of input port.
+     * This is a read-only attribute to query whether the input port is connected to a DelayOperator
+     * This is for iterative processing.
+     */
+    Attribute<Boolean> IS_CONNECTED_TO_DELAY_OPERATOR = new Attribute<>(false);
+
     @SuppressWarnings("FieldNameHidesFieldInSuperclass")
     long serialVersionUID = AttributeMap.AttributeInitializer.initialize(PortContext.class);
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/api/src/main/java/com/datatorrent/api/Operator.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Operator.java b/api/src/main/java/com/datatorrent/api/Operator.java
index 785c60b..d4a6a90 100644
--- a/api/src/main/java/com/datatorrent/api/Operator.java
+++ b/api/src/main/java/com/datatorrent/api/Operator.java
@@ -99,6 +99,25 @@ public interface Operator extends Component<OperatorContext>
   }
 
   /**
+   * DelayOperator is an operator of which the outgoing streaming window id is incremented by *one* by the
+   * engine, thus allowing loops in the "DAG". The output ports of a DelayOperator, if connected, *must*
+   * immediately connect to an upstream operator in the data flow path. Note that at least one output port of
+   * DelayOperator should be connected in order for the DelayOperator to serve its purpose.
+   *
+   * This is meant for iterative algorithms in the topology. A larger window increment can be simulated by an
+   * implementation of this interface.
+   */
+  interface DelayOperator extends Operator
+  {
+    /**
+     * This method gets called at the first window of the execution.
+     * The implementation is expected to emit tuples for initialization and/or
+     * recovery.
+     */
+    void firstWindow();
+  }
+
+  /**
    * A operator provides ports as a means to consume and produce data tuples.
    * Concrete ports implement derived interfaces.
    */

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java b/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java
new file mode 100644
index 0000000..ff676d4
--- /dev/null
+++ b/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.common.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+
+/**
+ * DefaultDelayOperator. This is an implementation of the DelayOperator that has one input port and one output
+ * port, and does a simple pass-through from the input port to the output port, while recording the tuples in memory
+ * as checkpoint state.  Subclass of this operator can override this behavior by overriding processTuple(T tuple).
+ *
+ * Note that the engine automatically does a +1 on the output window ID since it is a DelayOperator.
+ *
+ * This DelayOperator provides no data loss during recovery, but it incurs a run-time cost per tuple, and all tuples
+ * of the checkpoint window will be part of the checkpoint state.
+ */
+public class DefaultDelayOperator<T> extends BaseOperator implements Operator.DelayOperator
+{
+  public transient DefaultInputPort<T> input = new DefaultInputPort<T>()
+  {
+    @Override
+    public void process(T tuple)
+    {
+      processTuple(tuple);
+    }
+  };
+
+  public transient DefaultOutputPort<T> output = new DefaultOutputPort<T>();
+
+  protected List<T> lastWindowTuples = new ArrayList<>();
+
+  protected void processTuple(T tuple)
+  {
+    lastWindowTuples.add(tuple);
+    output.emit(tuple);
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    lastWindowTuples.clear();
+  }
+
+  @Override
+  public void firstWindow()
+  {
+    for (T tuple : lastWindowTuples) {
+      output.emit(tuple);
+    }
+  }
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
index 29e8e03..cda2a38 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
@@ -27,6 +27,7 @@ import java.net.SocketAddress;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -80,6 +81,7 @@ public class StramLocalCluster implements Runnable, Controller
   private boolean appDone = false;
   private final Map<String, StreamingContainer> injectShutdown = new ConcurrentHashMap<String, StreamingContainer>();
   private boolean heartbeatMonitoringEnabled = true;
+  private Callable<Boolean> exitCondition;
 
   public interface MockComponentFactory
   {
@@ -427,6 +429,11 @@ public class StramLocalCluster implements Runnable, Controller
     this.perContainerBufferServer = perContainerBufferServer;
   }
 
+  public void setExitCondition(Callable<Boolean> exitCondition)
+  {
+    this.exitCondition = exitCondition;
+  }
+
   @Override
   public void run()
   {
@@ -476,6 +483,14 @@ public class StramLocalCluster implements Runnable, Controller
         appDone = true;
       }
 
+      try {
+        if (exitCondition != null && exitCondition.call()) {
+          appDone = true;
+        }
+      } catch (Exception ex) {
+        break;
+      }
+
       if (Thread.interrupted()) {
         break;
       }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 4b79589..6233697 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -1008,7 +1008,7 @@ public class StreamingContainerManager implements PlanContext
       return operatorStatus.latencyMA.getAvg();
     }
     for (PTOperator.PTInput input : maxOperator.getInputs()) {
-      if (null != input.source.source) {
+      if (null != input.source.source && !input.delay) {
         operators.add(input.source.source);
       }
     }
@@ -1896,6 +1896,19 @@ public class StreamingContainerManager implements PlanContext
 
   }
 
+  private void addVisited(PTOperator operator, UpdateCheckpointsContext ctx)
+  {
+    ctx.visited.add(operator);
+    for (PTOperator.PTOutput out : operator.getOutputs()) {
+      for (PTOperator.PTInput sink : out.sinks) {
+        PTOperator sinkOperator = sink.target;
+        if (!ctx.visited.contains(sinkOperator)) {
+          addVisited(sinkOperator, ctx);
+        }
+      }
+    }
+  }
+
   /**
    * Compute checkpoints required for a given operator instance to be recovered.
    * This is done by looking at checkpoints available for downstream dependencies first,
@@ -1913,6 +1926,9 @@ public class StreamingContainerManager implements PlanContext
     if (operator.getState() == PTOperator.State.ACTIVE && (ctx.currentTms - operator.stats.lastWindowIdChangeTms) > operator.stats.windowProcessingTimeoutMillis) {
       // if the checkpoint is ahead, then it is not blocked but waiting for activation (state-less recovery, at-most-once)
       if (ctx.committedWindowId.longValue() >= operator.getRecoveryCheckpoint().windowId) {
+        LOG.debug("Marking operator {} blocked committed window {}, recovery window {}", operator,
+            Codec.getStringWindowId(ctx.committedWindowId.longValue()),
+            Codec.getStringWindowId(operator.getRecoveryCheckpoint().windowId));
         ctx.blocked.add(operator);
       }
     }
@@ -1922,25 +1938,30 @@ public class StreamingContainerManager implements PlanContext
       long currentWindowId = WindowGenerator.getWindowId(ctx.currentTms, this.vars.windowStartMillis, this.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS));
       maxCheckpoint = currentWindowId;
     }
+    ctx.visited.add(operator);
 
     // DFS downstream operators
-    for (PTOperator.PTOutput out : operator.getOutputs()) {
-      for (PTOperator.PTInput sink : out.sinks) {
-        PTOperator sinkOperator = sink.target;
-        if (!ctx.visited.contains(sinkOperator)) {
-          // downstream traversal
-          updateRecoveryCheckpoints(sinkOperator, ctx);
-        }
-        // recovery window id cannot move backwards
-        // when dynamically adding new operators
-        if (sinkOperator.getRecoveryCheckpoint().windowId >= operator.getRecoveryCheckpoint().windowId) {
-          maxCheckpoint = Math.min(maxCheckpoint, sinkOperator.getRecoveryCheckpoint().windowId);
-        }
+    if (operator.getOperatorMeta().getOperator() instanceof Operator.DelayOperator) {
+      addVisited(operator, ctx);
+    } else {
+      for (PTOperator.PTOutput out : operator.getOutputs()) {
+        for (PTOperator.PTInput sink : out.sinks) {
+          PTOperator sinkOperator = sink.target;
+          if (!ctx.visited.contains(sinkOperator)) {
+            // downstream traversal
+            updateRecoveryCheckpoints(sinkOperator, ctx);
+          }
+          // recovery window id cannot move backwards
+          // when dynamically adding new operators
+          if (sinkOperator.getRecoveryCheckpoint().windowId >= operator.getRecoveryCheckpoint().windowId) {
+            maxCheckpoint = Math.min(maxCheckpoint, sinkOperator.getRecoveryCheckpoint().windowId);
+          }
 
-        if (ctx.blocked.contains(sinkOperator)) {
-          if (sinkOperator.stats.getCurrentWindowId() == operator.stats.getCurrentWindowId()) {
-            // downstream operator is blocked by this operator
-            ctx.blocked.remove(sinkOperator);
+          if (ctx.blocked.contains(sinkOperator)) {
+            if (sinkOperator.stats.getCurrentWindowId() == operator.stats.getCurrentWindowId()) {
+              // downstream operator is blocked by this operator
+              ctx.blocked.remove(sinkOperator);
+            }
           }
         }
       }
@@ -1975,7 +1996,6 @@ public class StreamingContainerManager implements PlanContext
       LOG.debug("Skipping checkpoint update {} during {}", operator, operator.getState());
     }
 
-    ctx.visited.add(operator);
   }
 
   public long windowIdToMillis(long windowId)

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
index 93cee49..4777f93 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
@@ -34,11 +34,14 @@ import com.datatorrent.api.Operator.ShutdownException;
 import com.datatorrent.api.Sink;
 import com.datatorrent.api.annotation.Stateless;
 
+import com.datatorrent.bufferserver.packet.MessageType;
 import com.datatorrent.bufferserver.util.Codec;
 import com.datatorrent.netlet.util.DTThrowable;
 import com.datatorrent.netlet.util.CircularBuffer;
 import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerStats;
 import com.datatorrent.stram.debug.TappedReservoir;
+import com.datatorrent.stram.plan.logical.Operators;
+import com.datatorrent.stram.tuple.ResetWindowTuple;
 import com.datatorrent.stram.tuple.Tuple;
 
 /**
@@ -198,6 +201,15 @@ public class GenericNode extends Node<Operator>
     insideWindow = applicationWindowCount != 0;
   }
 
+  private boolean isInputPortConnectedToDelayOperator(String portName)
+  {
+    Operators.PortContextPair<InputPort<?>> pcPair = descriptor.inputPorts.get(portName);
+    if (pcPair == null || pcPair.context == null) {
+      return false;
+    }
+    return pcPair.context.getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR);
+  }
+
   /**
    * Originally this method was defined in an attempt to implement the interface Runnable.
    *
@@ -212,30 +224,67 @@ public class GenericNode extends Node<Operator>
     long spinMillis = context.getValue(OperatorContext.SPIN_MILLIS);
     final boolean handleIdleTime = operator instanceof IdleTimeHandler;
     int totalQueues = inputs.size();
+    int regularQueues = totalQueues;
+    // regularQueues is the number of queues that are not connected to a DelayOperator
+    for (String portName : inputs.keySet()) {
+      if (isInputPortConnectedToDelayOperator(portName)) {
+        regularQueues--;
+      }
+    }
 
-    ArrayList<SweepableReservoir> activeQueues = new ArrayList<SweepableReservoir>();
-    activeQueues.addAll(inputs.values());
+    ArrayList<Map.Entry<String, SweepableReservoir>> activeQueues = new ArrayList<>();
+    activeQueues.addAll(inputs.entrySet());
 
     int expectingBeginWindow = activeQueues.size();
     int receivedEndWindow = 0;
+    long firstWindowId = -1;
 
     TupleTracker tracker;
     LinkedList<TupleTracker> resetTupleTracker = new LinkedList<TupleTracker>();
-
     try {
       do {
-        Iterator<SweepableReservoir> buffers = activeQueues.iterator();
+        Iterator<Map.Entry<String, SweepableReservoir>> buffers = activeQueues.iterator();
   activequeue:
         while (buffers.hasNext()) {
-          SweepableReservoir activePort = buffers.next();
+          Map.Entry<String, SweepableReservoir> activePortEntry = buffers.next();
+          SweepableReservoir activePort = activePortEntry.getValue();
           Tuple t = activePort.sweep();
           if (t != null) {
+            boolean delay = (operator instanceof Operator.DelayOperator);
+            long windowAhead = 0;
+            if (delay) {
+              windowAhead = WindowGenerator.getAheadWindowId(t.getWindowId(), firstWindowMillis, windowWidthMillis, 1);
+            }
             switch (t.getType()) {
               case BEGIN_WINDOW:
                 if (expectingBeginWindow == totalQueues) {
+                  // This is the first begin window tuple among all ports
+                  if (isInputPortConnectedToDelayOperator(activePortEntry.getKey())) {
+                    // We need to wait for the first BEGIN_WINDOW from a port not connected to DelayOperator before
+                    // we can do anything with it, because otherwise if a CHECKPOINT tuple arrives from
+                    // upstream after the BEGIN_WINDOW tuple for the next window from the delay operator, it would end
+                    // up checkpointing in the middle of the window.  This code is assuming we have at least one
+                    // input port that is not connected to a DelayOperator, and we might have to change this later.
+                    // In the future, this condition will not be needed if we get rid of the CHECKPOINT tuple.
+                    continue;
+                  }
                   activePort.remove();
                   expectingBeginWindow--;
+                  receivedEndWindow = 0;
                   currentWindowId = t.getWindowId();
+                  if (delay) {
+                    if (WindowGenerator.getBaseSecondsFromWindowId(windowAhead) > t.getBaseSeconds()) {
+                      // Buffer server code strips out the base seconds from BEGIN_WINDOW and END_WINDOW tuples for
+                      // serialization optimization.  That's why we need a reset window here to tell the buffer
+                      // server we are having a new baseSeconds now.
+                      Tuple resetWindowTuple = new ResetWindowTuple(windowAhead);
+                      for (int s = sinks.length; s-- > 0; ) {
+                        sinks[s].put(resetWindowTuple);
+                      }
+                      controlTupleCount++;
+                    }
+                    t.setWindowId(windowAhead);
+                  }
                   for (int s = sinks.length; s-- > 0; ) {
                     sinks[s].put(t);
                   }
@@ -245,7 +294,6 @@ public class GenericNode extends Node<Operator>
                     insideWindow = true;
                     operator.beginWindow(currentWindowId);
                   }
-                  receivedEndWindow = 0;
                 }
                 else if (t.getWindowId() == currentWindowId) {
                   activePort.remove();
@@ -253,17 +301,7 @@ public class GenericNode extends Node<Operator>
                 }
                 else {
                   buffers.remove();
-
-                  /* find the name of the port which got out of sequence tuple */
-                  String port = null;
-                  for (Entry<String, SweepableReservoir> e : inputs.entrySet()) {
-                    if (e.getValue() == activePort) {
-                      port = e.getKey();
-                    }
-                  }
-
-                  assert (port != null); /* we should always find the port */
-
+                  String port = activePortEntry.getKey();
                   if (PROCESSING_MODE == ProcessingMode.AT_MOST_ONCE) {
                     if (t.getWindowId() < currentWindowId) {
                       /*
@@ -279,21 +317,21 @@ public class GenericNode extends Node<Operator>
                       WindowIdActivatedReservoir wiar = new WindowIdActivatedReservoir(port, activePort, currentWindowId);
                       wiar.setSink(sink);
                       inputs.put(port, wiar);
-                      activeQueues.add(wiar);
+                      activeQueues.add(new AbstractMap.SimpleEntry<String, SweepableReservoir>(port, wiar));
                       break activequeue;
                     }
                     else {
                       expectingBeginWindow--;
                       if (++receivedEndWindow == totalQueues) {
                         processEndWindow(null);
-                        activeQueues.addAll(inputs.values());
+                        activeQueues.addAll(inputs.entrySet());
                         expectingBeginWindow = activeQueues.size();
                         break activequeue;
                       }
                     }
                   }
                   else {
-                    logger.error("Catastrophic Error: Out of sequence tuple {} on port {} while expecting {}", Codec.getStringWindowId(t.getWindowId()), port, Codec.getStringWindowId(currentWindowId));
+                    logger.error("Catastrophic Error: Out of sequence {} tuple {} on port {} while expecting {}", t.getType(), Codec.getStringWindowId(t.getWindowId()), port, Codec.getStringWindowId(currentWindowId));
                     System.exit(2);
                   }
                 }
@@ -306,8 +344,11 @@ public class GenericNode extends Node<Operator>
                   endWindowDequeueTimes.put(activePort, System.currentTimeMillis());
                   if (++receivedEndWindow == totalQueues) {
                     assert (activeQueues.isEmpty());
+                    if (delay) {
+                      t.setWindowId(windowAhead);
+                    }
                     processEndWindow(t);
-                    activeQueues.addAll(inputs.values());
+                    activeQueues.addAll(inputs.entrySet());
                     expectingBeginWindow = activeQueues.size();
                     break activequeue;
                   }
@@ -330,11 +371,12 @@ public class GenericNode extends Node<Operator>
                       doCheckpoint = true;
                     }
                   }
-
-                  for (int s = sinks.length; s-- > 0; ) {
-                    sinks[s].put(t);
+                  if (!delay) {
+                    for (int s = sinks.length; s-- > 0; ) {
+                      sinks[s].put(t);
+                    }
+                    controlTupleCount++;
                   }
-                  controlTupleCount++;
                 }
                 break;
 
@@ -343,12 +385,14 @@ public class GenericNode extends Node<Operator>
                  * we will receive tuples which are equal to the number of input streams.
                  */
                 activePort.remove();
-                buffers.remove();
+                if (isInputPortConnectedToDelayOperator(activePortEntry.getKey())) {
+                  break; // breaking out of the switch/case
+                }
 
+                buffers.remove();
                 int baseSeconds = t.getBaseSeconds();
                 tracker = null;
-                Iterator<TupleTracker> trackerIterator = resetTupleTracker.iterator();
-                while (trackerIterator.hasNext()) {
+                for (Iterator<TupleTracker> trackerIterator = resetTupleTracker.iterator(); trackerIterator.hasNext(); ) {
                   tracker = trackerIterator.next();
                   if (tracker.tuple.getBaseSeconds() == baseSeconds) {
                     break;
@@ -356,7 +400,7 @@ public class GenericNode extends Node<Operator>
                 }
 
                 if (tracker == null) {
-                  tracker = new TupleTracker(t, totalQueues);
+                  tracker = new TupleTracker(t, regularQueues);
                   resetTupleTracker.add(tracker);
                 }
                 int trackerIndex = 0;
@@ -364,29 +408,50 @@ public class GenericNode extends Node<Operator>
                   if (tracker.ports[trackerIndex] == null) {
                     tracker.ports[trackerIndex++] = activePort;
                     break;
-                  }
-                  else if (tracker.ports[trackerIndex] == activePort) {
+                  } else if (tracker.ports[trackerIndex] == activePort) {
                     break;
                   }
 
                   trackerIndex++;
                 }
 
-                if (trackerIndex == totalQueues) {
-                  trackerIterator = resetTupleTracker.iterator();
+                if (trackerIndex == regularQueues) {
+                  Iterator<TupleTracker> trackerIterator = resetTupleTracker.iterator();
                   while (trackerIterator.hasNext()) {
                     if (trackerIterator.next().tuple.getBaseSeconds() <= baseSeconds) {
                       trackerIterator.remove();
                     }
                   }
-                  for (int s = sinks.length; s-- > 0; ) {
-                    sinks[s].put(t);
+                  if (!delay) {
+                    for (int s = sinks.length; s-- > 0; ) {
+                      sinks[s].put(t);
+                    }
+                    controlTupleCount++;
                   }
-                  controlTupleCount++;
-
-                  assert (activeQueues.isEmpty());
-                  activeQueues.addAll(inputs.values());
+                  if (!activeQueues.isEmpty()) {
+                    // make sure they are all queues from DelayOperator
+                    for (Map.Entry<String, SweepableReservoir> entry : activeQueues) {
+                      if (!isInputPortConnectedToDelayOperator(entry.getKey())) {
+                        assert (false);
+                      }
+                    }
+                    activeQueues.clear();
+                  }
+                  activeQueues.addAll(inputs.entrySet());
                   expectingBeginWindow = activeQueues.size();
+
+                  if (firstWindowId == -1) {
+                    if (delay) {
+                      for (int s = sinks.length; s-- > 0; ) {
+                        sinks[s].put(t);
+                      }
+                      controlTupleCount++;
+                      // if it's a DelayOperator and this is the first RESET_WINDOW (start) or END_STREAM
+                      // (recovery), fabricate the first window
+                      fabricateFirstWindow((Operator.DelayOperator)operator, windowAhead);
+                    }
+                    firstWindowId = t.getWindowId();
+                  }
                   break activequeue;
                 }
                 break;
@@ -394,6 +459,15 @@ public class GenericNode extends Node<Operator>
               case END_STREAM:
                 activePort.remove();
                 buffers.remove();
+                if (firstWindowId == -1) {
+                  // this is for recovery from a checkpoint for DelayOperator
+                  if (delay) {
+                    // if it's a DelayOperator and this is the first RESET_WINDOW (start) or END_STREAM (recovery),
+                    // fabricate the first window
+                    fabricateFirstWindow((Operator.DelayOperator)operator, windowAhead);
+                  }
+                  firstWindowId = t.getWindowId();
+                }
                 for (Iterator<Entry<String, SweepableReservoir>> it = inputs.entrySet().iterator(); it.hasNext(); ) {
                   Entry<String, SweepableReservoir> e = it.next();
                   if (e.getValue() == activePort) {
@@ -409,7 +483,7 @@ public class GenericNode extends Node<Operator>
                       if (e.getKey().equals(dic.portname)) {
                         connectInputPort(dic.portname, dic.reservoir);
                         dici.remove();
-                        activeQueues.add(dic.reservoir);
+                        activeQueues.add(new AbstractMap.SimpleEntry<>(dic.portname, dic.reservoir));
                         break activequeue;
                       }
                     }
@@ -427,17 +501,18 @@ public class GenericNode extends Node<Operator>
                  * Since one of the operators we care about it gone, we should relook at our ports.
                  * We need to make sure that the END_STREAM comes outside of the window.
                  */
+                regularQueues--;
                 totalQueues--;
 
                 boolean break_activequeue = false;
-                if (totalQueues == 0) {
+                if (regularQueues == 0) {
                   alive = false;
                   break_activequeue = true;
                 }
                 else if (activeQueues.isEmpty()) {
                   assert (!inputs.isEmpty());
                   processEndWindow(null);
-                  activeQueues.addAll(inputs.values());
+                  activeQueues.addAll(inputs.entrySet());
                   expectingBeginWindow = activeQueues.size();
                   break_activequeue = true;
                 }
@@ -450,22 +525,22 @@ public class GenericNode extends Node<Operator>
                  * it's the only one which has not, then we consider it delivered and release the reset tuple downstream.
                  */
                 Tuple tuple = null;
-                for (trackerIterator = resetTupleTracker.iterator(); trackerIterator.hasNext(); ) {
+                for (Iterator<TupleTracker> trackerIterator = resetTupleTracker.iterator(); trackerIterator.hasNext(); ) {
                   tracker = trackerIterator.next();
 
                   trackerIndex = 0;
                   while (trackerIndex < tracker.ports.length) {
                     if (tracker.ports[trackerIndex] == activePort) {
-                      SweepableReservoir[] ports = new SweepableReservoir[totalQueues];
+                      SweepableReservoir[] ports = new SweepableReservoir[regularQueues];
                       System.arraycopy(tracker.ports, 0, ports, 0, trackerIndex);
-                      if (trackerIndex < totalQueues) {
+                      if (trackerIndex < regularQueues) {
                         System.arraycopy(tracker.ports, trackerIndex + 1, ports, trackerIndex, tracker.ports.length - trackerIndex - 1);
                       }
                       tracker.ports = ports;
                       break;
                     }
                     else if (tracker.ports[trackerIndex] == null) {
-                      if (trackerIndex == totalQueues) { /* totalQueues is already adjusted above */
+                      if (trackerIndex == regularQueues) { /* regularQueues is already adjusted above */
                         if (tuple == null || tuple.getBaseSeconds() < tracker.tuple.getBaseSeconds()) {
                           tuple = tracker.tuple;
                         }
@@ -475,7 +550,7 @@ public class GenericNode extends Node<Operator>
                       break;
                     }
                     else {
-                      tracker.ports = Arrays.copyOf(tracker.ports, totalQueues);
+                      tracker.ports = Arrays.copyOf(tracker.ports, regularQueues);
                     }
 
                     trackerIndex++;
@@ -485,7 +560,7 @@ public class GenericNode extends Node<Operator>
                 /*
                  * Since we were waiting for a reset tuple on this stream, we should not any longer.
                  */
-                if (tuple != null) {
+                if (tuple != null && !delay) {
                   for (int s = sinks.length; s-- > 0; ) {
                     sinks[s].put(tuple);
                   }
@@ -509,8 +584,8 @@ public class GenericNode extends Node<Operator>
         }
         else {
           boolean need2sleep = true;
-          for (SweepableReservoir cb : activeQueues) {
-            if (cb.size() > 0) {
+          for (Map.Entry<String, SweepableReservoir> cb : activeQueues) {
+            if (cb.getValue().size() > 0) {
               need2sleep = false;
               break;
             }
@@ -582,6 +657,21 @@ public class GenericNode extends Node<Operator>
 
   }
 
+  private void fabricateFirstWindow(Operator.DelayOperator delayOperator, long windowAhead)
+  {
+    Tuple beginWindowTuple = new Tuple(MessageType.BEGIN_WINDOW, windowAhead);
+    Tuple endWindowTuple = new Tuple(MessageType.END_WINDOW, windowAhead);
+    for (Sink<Object> sink : outputs.values()) {
+      sink.put(beginWindowTuple);
+    }
+    controlTupleCount++;
+    delayOperator.firstWindow();
+    for (Sink<Object> sink : outputs.values()) {
+      sink.put(endWindowTuple);
+    }
+    controlTupleCount++;
+  }
+
   /**
    * End window dequeue times may not have been saved for all the input ports during deactivate,
    * so save them for reporting. SPOI-1324.

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/engine/Node.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Node.java b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
index 068a325..d4970cd 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/Node.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
@@ -126,6 +126,8 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera
   private ExecutorService executorService;
   private Queue<Pair<FutureTask<Stats.CheckpointStats>, CheckpointWindowInfo>> taskQueue;
   protected Stats.CheckpointStats checkpointStats;
+  public long firstWindowMillis;
+  public long windowWidthMillis;
 
   public Node(OPERATOR operator, OperatorContext context)
   {
@@ -354,7 +356,9 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera
 
   protected void emitEndWindow()
   {
-    EndWindowTuple ewt = new EndWindowTuple(currentWindowId);
+    long windowId = (operator instanceof Operator.DelayOperator) ?
+        WindowGenerator.getAheadWindowId(currentWindowId, firstWindowMillis, windowWidthMillis, 1) : currentWindowId;
+    EndWindowTuple ewt = new EndWindowTuple(windowId);
     for (int s = sinks.length; s-- > 0; ) {
       sinks[s].put(ewt);
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
index 14e00a9..79d9037 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
@@ -894,6 +894,8 @@ public class StreamingContainer extends YarnContainerMain
       Node<?> node = Node.retrieveNode(backupAgent.load(ndi.id, ctx.stateless ? Stateless.WINDOW_ID : ndi.checkpoint.windowId), ctx, ndi.type);
       node.currentWindowId = ndi.checkpoint.windowId;
       node.applicationWindowCount = ndi.checkpoint.applicationWindowCount;
+      node.firstWindowMillis = firstWindowMillis;
+      node.windowWidthMillis = windowWidthMillis;
 
       node.setId(ndi.id);
       nodes.put(ndi.id, node);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
index 5610112..ea429af 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
@@ -314,13 +314,25 @@ public class WindowGenerator extends MuxReservoir implements Stream, Runnable
     long baseMillis = (windowId >> 32) * 1000;
     long diff = baseMillis - firstWindowMillis;
     long baseChangeInterval = windowWidthMillis * (WindowGenerator.MAX_WINDOW_ID + 1);
+    assert (baseChangeInterval > 0);
     long multiplier = diff / baseChangeInterval;
     if (diff % baseChangeInterval > 0) {
       multiplier++;
     }
     assert (multiplier >= 0);
     windowId = windowId & WindowGenerator.WINDOW_MASK;
-    return firstWindowMillis + (multiplier * windowWidthMillis * (WindowGenerator.MAX_WINDOW_ID + 1)) + windowId * windowWidthMillis;
+    return firstWindowMillis + (multiplier * baseChangeInterval) + (windowId * windowWidthMillis);
+  }
+
+  /**
+   * Utility function to get the base seconds from a window id
+   *
+   * @param windowId
+   * @return the base seconds for the given window id
+   */
+  public static long getBaseSecondsFromWindowId(long windowId)
+  {
+    return windowId >>> 32;
   }
 
   private class MasterReservoir extends CircularBuffer<Tuple> implements Reservoir

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 867f814..3c26118 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -1758,6 +1758,14 @@ public class LogicalPlan implements Serializable, DAG
       throw new ValidationException("Loops in graph: " + cycles);
     }
 
+    List<List<String>> invalidDelays = new ArrayList<>();
+    for (OperatorMeta n : rootOperators) {
+      findInvalidDelays(n, invalidDelays);
+    }
+    if (!invalidDelays.isEmpty()) {
+      throw new ValidationException("Invalid delays in graph: " + invalidDelays);
+    }
+
     for (StreamMeta s: streams.values()) {
       if (s.source == null) {
         throw new ValidationException("Stream source not connected: " + s.getName());
@@ -1814,6 +1822,11 @@ public class LogicalPlan implements Serializable, DAG
       return;
     }
 
+    if (om.getOperator() instanceof Operator.DelayOperator) {
+      String msg = String.format("Locality %s invalid for delay operator %s", Locality.THREAD_LOCAL, om);
+      throw new ValidationException(msg);
+    }
+
     for (StreamMeta sm: om.inputStreams.values()){
       // validation fail as each input stream should be OIO
       if (sm.locality != Locality.THREAD_LOCAL){
@@ -1822,6 +1835,10 @@ public class LogicalPlan implements Serializable, DAG
         throw new ValidationException(msg);
       }
 
+      if (sm.source.operatorMeta.getOperator() instanceof Operator.DelayOperator) {
+        String msg = String.format("Locality %s invalid for delay operator %s", Locality.THREAD_LOCAL, sm.source.operatorMeta);
+        throw new ValidationException(msg);
+      }
       // gets oio root for input operator for the stream
       Integer oioStreamRoot = getOioRoot(sm.source.operatorMeta);
 
@@ -1895,6 +1912,11 @@ public class LogicalPlan implements Serializable, DAG
     // depth first successors traversal
     for (StreamMeta downStream: om.outputStreams.values()) {
       for (InputPortMeta sink: downStream.sinks) {
+        if (om.getOperator() instanceof Operator.DelayOperator) {
+          // this is an iteration loop, do not treat it as downstream when detecting cycles
+          sink.attributes.put(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR, true);
+          continue;
+        }
         OperatorMeta successor = sink.getOperatorWrapper();
         if (successor == null) {
           continue;
@@ -1932,6 +1954,37 @@ public class LogicalPlan implements Serializable, DAG
     }
   }
 
+  public void findInvalidDelays(OperatorMeta om, List<List<String>> invalidDelays)
+  {
+    stack.push(om);
+
+    // depth first successors traversal
+    boolean isDelayOperator = om.getOperator() instanceof Operator.DelayOperator;
+    if (isDelayOperator) {
+      if (om.getValue(OperatorContext.APPLICATION_WINDOW_COUNT) != 1) {
+        LOG.debug("detected DelayOperator having APPLICATION_WINDOW_COUNT not equal to 1");
+        invalidDelays.add(Collections.singletonList(om.getName()));
+      }
+    }
+
+    for (StreamMeta downStream: om.outputStreams.values()) {
+      for (InputPortMeta sink : downStream.sinks) {
+        OperatorMeta successor = sink.getOperatorWrapper();
+        if (isDelayOperator) {
+          // Check whether all downstream operators are already visited in the path
+          if (successor != null && !stack.contains(successor)) {
+            LOG.debug("detected DelayOperator does not immediately output to a visited operator {}.{}->{}.{}",
+                om.getName(), downStream.getSource().getPortName(), successor.getName(), sink.getPortName());
+            invalidDelays.add(Arrays.asList(om.getName(), successor.getName()));
+          }
+        } else {
+          findInvalidDelays(successor, invalidDelays);
+        }
+      }
+    }
+    stack.pop();
+  }
+
   private void validateProcessingMode(OperatorMeta om, Set<OperatorMeta> visited)
   {
     for (StreamMeta is : om.getInputStreams().values()) {

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java
index 6adfd64..ae276d8 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java
@@ -81,6 +81,7 @@ public class PTOperator implements java.io.Serializable
     public final PartitionKeys partitions;
     public final PTOutput source;
     public final String portName;
+    public final boolean delay;
 
     /**
      *
@@ -90,7 +91,7 @@ public class PTOperator implements java.io.Serializable
      * @param partitions
      * @param source
      */
-    protected PTInput(String portName, StreamMeta logicalStream, PTOperator target, PartitionKeys partitions, PTOutput source)
+    protected PTInput(String portName, StreamMeta logicalStream, PTOperator target, PartitionKeys partitions, PTOutput source, boolean delay)
     {
       this.logicalStream = logicalStream;
       this.target = target;
@@ -98,6 +99,7 @@ public class PTOperator implements java.io.Serializable
       this.source = source;
       this.portName = portName;
       this.source.sinks.add(this);
+      this.delay = delay;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index 829a6fd..da96ef3 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -328,8 +328,11 @@ public class PhysicalPlan implements Serializable
 
       boolean upstreamDeployed = true;
 
-      for (StreamMeta s : n.getInputStreams().values()) {
-        if (s.getSource() != null && !this.logicalToPTOperator.containsKey(s.getSource().getOperatorMeta())) {
+      for (Map.Entry<InputPortMeta, StreamMeta> entry : n.getInputStreams().entrySet()) {
+        StreamMeta s = entry.getValue();
+        boolean delay = entry.getKey().getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR);
+        // skip delay sources since it's going to be handled as downstream
+        if (!delay && s.getSource() != null && !this.logicalToPTOperator.containsKey(s.getSource().getOperatorMeta())) {
           pendingNodes.push(n);
           pendingNodes.push(s.getSource().getOperatorMeta());
           upstreamDeployed = false;
@@ -907,7 +910,10 @@ public class PhysicalPlan implements Serializable
 
     for (Map.Entry<InputPortMeta, StreamMeta> ipm : m.logicalOperator.getInputStreams().entrySet()) {
       PMapping sourceMapping = this.logicalToPTOperator.get(ipm.getValue().getSource().getOperatorMeta());
-
+      if (ipm.getValue().getSource().getOperatorMeta().getOperator() instanceof Operator.DelayOperator) {
+        // skip if the source is a DelayOperator
+        continue;
+      }
       if (ipm.getKey().getValue(PortContext.PARTITION_PARALLEL)) {
         if (sourceMapping.partitions.size() < m.partitions.size()) {
           throw new AssertionError("Number of partitions don't match in parallel mapping " + sourceMapping.logicalOperator.getName() + " -> " + m.logicalOperator.getName() + ", " + sourceMapping.partitions.size() + " -> " + m.partitions.size());
@@ -942,11 +948,11 @@ public class PhysicalPlan implements Serializable
                 PTOperator slidingUnifier = StreamMapping.createSlidingUnifier(sourceOut.logicalStream, this,
                   sourceOM.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT), slidingWindowCount);
                 StreamMapping.addInput(slidingUnifier, sourceOut, null);
-                input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, slidingUnifier.outputs.get(0));
+                input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, slidingUnifier.outputs.get(0), ipm.getKey().getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR));
                 sourceMapping.outputStreams.get(ipm.getValue().getSource()).slidingUnifiers.add(slidingUnifier);
               }
               else {
-                input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, sourceOut);
+                input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, sourceOut, ipm.getKey().getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR));
               }
               oper.inputs.add(input);
             }
@@ -1445,6 +1451,9 @@ public class PhysicalPlan implements Serializable
     PMapping upstreamPartitioned = null;
 
     for (Map.Entry<LogicalPlan.InputPortMeta, StreamMeta> e : om.getInputStreams().entrySet()) {
+      if (e.getValue().getSource().getOperatorMeta().getOperator() instanceof Operator.DelayOperator) {
+        continue;
+      }
       PMapping m = logicalToPTOperator.get(e.getValue().getSource().getOperatorMeta());
       if (e.getKey().getValue(PortContext.PARTITION_PARALLEL).equals(true)) {
         // operator partitioned with upstream

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
index d42c327..91c6eef 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
@@ -347,7 +347,7 @@ public class StreamMapping implements java.io.Serializable
     // link to upstream output(s) for this stream
     for (PTOutput upstreamOut : sourceOper.outputs) {
       if (upstreamOut.logicalStream == streamMeta) {
-        PTInput input = new PTInput(ipm.getPortName(), streamMeta, oper, pks, upstreamOut);
+        PTInput input = new PTInput(ipm.getPortName(), streamMeta, oper, pks, upstreamOut, ipm.getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR));
         oper.inputs.add(input);
       }
     }
@@ -356,7 +356,7 @@ public class StreamMapping implements java.io.Serializable
   public static void addInput(PTOperator target, PTOutput upstreamOut, PartitionKeys pks)
   {
     StreamMeta lStreamMeta = upstreamOut.logicalStream;
-    PTInput input = new PTInput("<merge#" + lStreamMeta.getSource().getPortName() + ">", lStreamMeta, target, pks, upstreamOut);
+    PTInput input = new PTInput("<merge#" + lStreamMeta.getSource().getPortName() + ">", lStreamMeta, target, pks, upstreamOut, false);
     target.inputs.add(input);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/tuple/Tuple.java b/engine/src/main/java/com/datatorrent/stram/tuple/Tuple.java
index 23c197b..9191b65 100644
--- a/engine/src/main/java/com/datatorrent/stram/tuple/Tuple.java
+++ b/engine/src/main/java/com/datatorrent/stram/tuple/Tuple.java
@@ -52,6 +52,11 @@ public class Tuple
     return windowId;
   }
 
+  public void setWindowId(long windowId)
+  {
+    this.windowId = windowId;
+  }
+
   public final int getBaseSeconds()
   {
     return (int)(windowId >> 32);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java b/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
index 3f97b54..1c17d68 100644
--- a/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
@@ -76,8 +76,7 @@ public class TupleRecorderTest
   public TupleRecorder getTupleRecorder(final StramLocalCluster localCluster, final PTOperator op)
   {
     TupleRecorderCollection instance = (TupleRecorderCollection)localCluster.getContainer(op).getInstance(classname);
-    TupleRecorder tupleRecorder = instance.getTupleRecorder(op.getId(), null);
-    return tupleRecorder;
+    return instance.getTupleRecorder(op.getId(), null);
   }
 
   public class Tuple
@@ -89,8 +88,7 @@ public class TupleRecorderTest
   @Test
   public void testRecorder() throws IOException
   {
-    FileSystem fs = new LocalFileSystem();
-    try {
+    try (FileSystem fs = new LocalFileSystem()) {
       TupleRecorder recorder = new TupleRecorder(null, "application_test_id_1");
       recorder.getStorage().setBytesPerPartFile(4096);
       recorder.getStorage().setLocalMode(true);
@@ -132,80 +130,76 @@ public class TupleRecorderTest
 
       fs.initialize((new Path(recorder.getStorage().getBasePath()).toUri()), new Configuration());
       Path path;
-      FSDataInputStream is;
       String line;
-      BufferedReader br;
 
       path = new Path(recorder.getStorage().getBasePath(), FSPartFileCollection.INDEX_FILE);
-      is = fs.open(path);
-      br = new BufferedReader(new InputStreamReader(is));
-
-      line = br.readLine();
-      //    Assert.assertEquals("check index", "B:1000:T:0:part0.txt", line);
-      Assert.assertTrue("check index", line.matches("F:part0.txt:\\d+-\\d+:4:T:1000-1000:33:\\{\"3\":\"1\",\"1\":\"1\",\"0\":\"1\",\"2\":\"1\"\\}"));
+      try (FSDataInputStream is = fs.open(path);
+          BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
 
+        line = br.readLine();
+        //    Assert.assertEquals("check index", "B:1000:T:0:part0.txt", line);
+        Assert.assertTrue("check index", line
+            .matches("F:part0.txt:\\d+-\\d+:4:T:1000-1000:33:\\{\"3\":\"1\",\"1\":\"1\",\"0\":\"1\",\"2\":\"1\"\\}"));
+      }
       path = new Path(recorder.getStorage().getBasePath(), FSPartFileCollection.META_FILE);
-      is = fs.open(path);
-      br = new BufferedReader(new InputStreamReader(is));
-
-      ObjectMapper mapper = new ObjectMapper();
-      line = br.readLine();
-      Assert.assertEquals("check version", "1.2", line);
-      br.readLine(); // RecordInfo
-      //RecordInfo ri = mapper.readValue(line, RecordInfo.class);
-      line = br.readLine();
-      PortInfo pi = mapper.readValue(line, PortInfo.class);
-      Assert.assertEquals("port1", recorder.getPortInfoMap().get(pi.name).id, pi.id);
-      Assert.assertEquals("port1", recorder.getPortInfoMap().get(pi.name).type, pi.type);
-      line = br.readLine();
-      pi = mapper.readValue(line, PortInfo.class);
-      Assert.assertEquals("port2", recorder.getPortInfoMap().get(pi.name).id, pi.id);
-      Assert.assertEquals("port2", recorder.getPortInfoMap().get(pi.name).type, pi.type);
-      line = br.readLine();
-      pi = mapper.readValue(line, PortInfo.class);
-      Assert.assertEquals("port3", recorder.getPortInfoMap().get(pi.name).id, pi.id);
-      Assert.assertEquals("port3", recorder.getPortInfoMap().get(pi.name).type, pi.type);
-      line = br.readLine();
-      pi = mapper.readValue(line, PortInfo.class);
-      Assert.assertEquals("port4", recorder.getPortInfoMap().get(pi.name).id, pi.id);
-      Assert.assertEquals("port4", recorder.getPortInfoMap().get(pi.name).type, pi.type);
-      Assert.assertEquals("port size", 4, recorder.getPortInfoMap().size());
-      //line = br.readLine();
-
+      try (FSDataInputStream is = fs.open(path);
+          BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
+
+        ObjectMapper mapper = new ObjectMapper();
+        line = br.readLine();
+        Assert.assertEquals("check version", "1.2", line);
+        br.readLine(); // RecordInfo
+        //RecordInfo ri = mapper.readValue(line, RecordInfo.class);
+        line = br.readLine();
+        PortInfo pi = mapper.readValue(line, PortInfo.class);
+        Assert.assertEquals("port1", recorder.getPortInfoMap().get(pi.name).id, pi.id);
+        Assert.assertEquals("port1", recorder.getPortInfoMap().get(pi.name).type, pi.type);
+        line = br.readLine();
+        pi = mapper.readValue(line, PortInfo.class);
+        Assert.assertEquals("port2", recorder.getPortInfoMap().get(pi.name).id, pi.id);
+        Assert.assertEquals("port2", recorder.getPortInfoMap().get(pi.name).type, pi.type);
+        line = br.readLine();
+        pi = mapper.readValue(line, PortInfo.class);
+        Assert.assertEquals("port3", recorder.getPortInfoMap().get(pi.name).id, pi.id);
+        Assert.assertEquals("port3", recorder.getPortInfoMap().get(pi.name).type, pi.type);
+        line = br.readLine();
+        pi = mapper.readValue(line, PortInfo.class);
+        Assert.assertEquals("port4", recorder.getPortInfoMap().get(pi.name).id, pi.id);
+        Assert.assertEquals("port4", recorder.getPortInfoMap().get(pi.name).type, pi.type);
+        Assert.assertEquals("port size", 4, recorder.getPortInfoMap().size());
+        //line = br.readLine();
+      }
       path = new Path(recorder.getStorage().getBasePath(), "part0.txt");
-      is = fs.open(path);
-      br = new BufferedReader(new InputStreamReader(is));
+      try (FSDataInputStream is = fs.open(path);
+          BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
 
-      line = br.readLine();
-      Assert.assertTrue("check part0", line.startsWith("B:"));
-      Assert.assertTrue("check part0", line.endsWith(":1000"));
+        line = br.readLine();
+        Assert.assertTrue("check part0", line.startsWith("B:"));
+        Assert.assertTrue("check part0", line.endsWith(":1000"));
 
-      line = br.readLine();
-      Assert.assertTrue("check part0 1", line.startsWith("T:"));
-      Assert.assertTrue("check part0 1", line.endsWith(":0:30:{\"key\":\"speed\",\"value\":\"5m/h\"}"));
+        line = br.readLine();
+        Assert.assertTrue("check part0 1", line.startsWith("T:"));
+        Assert.assertTrue("check part0 1", line.endsWith(":0:30:{\"key\":\"speed\",\"value\":\"5m/h\"}"));
 
-      line = br.readLine();
-      Assert.assertTrue("check part0 2", line.startsWith("T:"));
-      Assert.assertTrue("check part0 2", line.endsWith(":2:30:{\"key\":\"speed\",\"value\":\"4m/h\"}"));
+        line = br.readLine();
+        Assert.assertTrue("check part0 2", line.startsWith("T:"));
+        Assert.assertTrue("check part0 2", line.endsWith(":2:30:{\"key\":\"speed\",\"value\":\"4m/h\"}"));
 
-      line = br.readLine();
-      Assert.assertTrue("check part0 3", line.startsWith("T:"));
-      Assert.assertTrue("check part0 3", line.endsWith(":1:30:{\"key\":\"speed\",\"value\":\"6m/h\"}"));
+        line = br.readLine();
+        Assert.assertTrue("check part0 3", line.startsWith("T:"));
+        Assert.assertTrue("check part0 3", line.endsWith(":1:30:{\"key\":\"speed\",\"value\":\"6m/h\"}"));
 
-      line = br.readLine();
-      Assert.assertTrue("check part0 4", line.startsWith("T:"));
-      Assert.assertTrue("check part0 4", line.endsWith(":3:30:{\"key\":\"speed\",\"value\":\"2m/h\"}"));
+        line = br.readLine();
+        Assert.assertTrue("check part0 4", line.startsWith("T:"));
+        Assert.assertTrue("check part0 4", line.endsWith(":3:30:{\"key\":\"speed\",\"value\":\"2m/h\"}"));
 
-      line = br.readLine();
-      Assert.assertTrue("check part0 5", line.startsWith("E:"));
-      Assert.assertTrue("check part0 5", line.endsWith(":1000"));
-    }
-    catch (IOException ex) {
+        line = br.readLine();
+        Assert.assertTrue("check part0 5", line.startsWith("E:"));
+        Assert.assertTrue("check part0 5", line.endsWith(":1000"));
+      }
+    } catch (IOException ex) {
       throw new RuntimeException(ex);
     }
-    finally {
-      fs.close();
-    }
   }
 
   private static final File testWorkDir = new File("target", TupleRecorderTest.class.getName());
@@ -234,17 +228,17 @@ public class TupleRecorderTest
     final PTOperator ptOp2 = localCluster.findByLogicalNode(dag.getMeta(op2));
     StramTestSupport.waitForActivation(localCluster, ptOp2);
 
-    testRecordingOnOperator(localCluster, ptOp2, 2);
+    testRecordingOnOperator(localCluster, ptOp2);
 
     final PTOperator ptOp1 = localCluster.findByLogicalNode(dag.getMeta(op1));
     StramTestSupport.waitForActivation(localCluster, ptOp1);
 
-    testRecordingOnOperator(localCluster, ptOp1, 1);
+    testRecordingOnOperator(localCluster, ptOp1);
 
     localCluster.shutdown();
   }
 
-  private void testRecordingOnOperator(final StramLocalCluster localCluster, final PTOperator op, int numPorts) throws Exception
+  private void testRecordingOnOperator(final StramLocalCluster localCluster, final PTOperator op) throws Exception
   {
     String id = "xyz";
     localCluster.getStreamingContainerManager().startRecording(id, op.getId(), null, 0);
@@ -259,25 +253,30 @@ public class TupleRecorderTest
 
     };
     Assert.assertTrue("Should get a tuple recorder within 10 seconds", StramTestSupport.awaitCompletion(c, 10000));
-    TupleRecorder tupleRecorder = getTupleRecorder(localCluster, op);
+    final TupleRecorder tupleRecorder = getTupleRecorder(localCluster, op);
     long startTime = tupleRecorder.getStartTime();
-    BufferedReader br;
     String line;
     File dir = new File(testWorkDir, "recordings/" + op.getId() + "/" + id);
     File file;
 
-    file = new File(dir, "meta.txt");
+    file = new File(dir, FSPartFileCollection.META_FILE);
     Assert.assertTrue("meta file should exist", file.exists());
-    br = new BufferedReader(new FileReader(file));
-    line = br.readLine();
-    Assert.assertEquals("version should be 1.2", "1.2", line);
-    line = br.readLine();
-    JSONObject json = new JSONObject(line);
-    Assert.assertEquals("Start time verification", startTime, json.getLong("startTime"));
-    
-    for (int i = 0; i < numPorts; i++) {
+    int numPorts = tupleRecorder.getSinkMap().size();
+
+    try (BufferedReader br = new BufferedReader(new FileReader(file))) {
       line = br.readLine();
-      Assert.assertTrue("should contain name, streamName, type and id", line != null && line.contains("\"name\"") && line.contains("\"streamName\"") && line.contains("\"type\"") && line.contains("\"id\""));
+      Assert.assertEquals("version should be 1.2", "1.2", line);
+      line = br.readLine();
+      JSONObject json = new JSONObject(line);
+      Assert.assertEquals("Start time verification", startTime, json.getLong("startTime"));
+      Assert.assertTrue(numPorts > 0);
+
+      for (int i = 0; i < numPorts; i++) {
+        line = br.readLine();
+        Assert.assertTrue("should contain name, streamName, type and id", line != null && line
+            .contains("\"name\"") && line.contains("\"streamName\"") && line.contains("\"type\"") && line
+            .contains("\"id\""));
+      }
     }
 
     c = new WaitCondition()
@@ -285,7 +284,6 @@ public class TupleRecorderTest
       @Override
       public boolean isComplete()
       {
-        TupleRecorder tupleRecorder = getTupleRecorder(localCluster, op);
         return (tupleRecorder.getTotalTupleCount() >= testTupleCount);
       }
 
@@ -306,24 +304,23 @@ public class TupleRecorderTest
     };
     Assert.assertTrue("Tuple recorder shouldn't exist any more after stopping", StramTestSupport.awaitCompletion(c, 5000));
 
-    file = new File(dir, "index.txt");
+    file = new File(dir, FSPartFileCollection.INDEX_FILE);
     Assert.assertTrue("index file should exist", file.exists());
-    br = new BufferedReader(new FileReader(file));
 
-    ArrayList<String> partFiles = new ArrayList<String>();
+    ArrayList<String> partFiles = new ArrayList<>();
     int indexCount = 0;
-    while ((line = br.readLine()) != null) {
-      String partFile = "part" + indexCount + ".txt";
-      if (line.startsWith("F:" + partFile + ":")) {
-        partFiles.add(partFile);
-        indexCount++;
-      }
-      else if (line.startsWith("E")) {
-        Assert.assertEquals("index file should end after E line", br.readLine(), null);
-        break;
-      }
-      else {
-        Assert.fail("index file line is not starting with F or E");
+    try (BufferedReader br = new BufferedReader(new FileReader(file))) {
+      while ((line = br.readLine()) != null) {
+        String partFile = "part" + indexCount + ".txt";
+        if (line.startsWith("F:" + partFile + ":")) {
+          partFiles.add(partFile);
+          indexCount++;
+        } else if (line.startsWith("E")) {
+          Assert.assertEquals("index file should end after E line", br.readLine(), null);
+          break;
+        } else {
+          Assert.fail("index file line is not starting with F or E");
+        }
       }
     }
 
@@ -337,17 +334,16 @@ public class TupleRecorderTest
         Assert.assertTrue(partFile + " should be greater than 1KB", file.length() >= 1024);
       }
       Assert.assertTrue(partFile + " should exist", file.exists());
-      br = new BufferedReader(new FileReader(file));
-      while ((line = br.readLine()) != null) {
-        if (line.startsWith("B:")) {
-          beginWindowExists = true;
-        }
-        else if (line.startsWith("E:")) {
-          endWindowExists = true;
-        }
-        else if (line.startsWith("T:")) {
-          String[] parts = line.split(":");
-          tupleCount[Integer.valueOf(parts[2])]++;
+      try (BufferedReader br = new BufferedReader(new FileReader(file))) {
+        while ((line = br.readLine()) != null) {
+          if (line.startsWith("B:")) {
+            beginWindowExists = true;
+          } else if (line.startsWith("E:")) {
+            endWindowExists = true;
+          } else if (line.startsWith("T:")) {
+            String[] parts = line.split(":");
+            tupleCount[Integer.valueOf(parts[2])]++;
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
index c7e8ccc..2577504 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
@@ -277,6 +277,8 @@ public class GenericNodeTest
     gn.connectInputPort("ip1", reservoir1);
     gn.connectInputPort("ip2", reservoir2);
     gn.connectOutputPort("op", output);
+    gn.firstWindowMillis = 0;
+    gn.windowWidthMillis = 100;
 
     final AtomicBoolean ab = new AtomicBoolean(false);
     Thread t = new Thread()
@@ -382,6 +384,8 @@ public class GenericNodeTest
     gn.connectInputPort("ip1", reservoir1);
     gn.connectInputPort("ip2", reservoir2);
     gn.connectOutputPort("op", Sink.BLACKHOLE);
+    gn.firstWindowMillis = 0;
+    gn.windowWidthMillis = 100;
 
     final AtomicBoolean ab = new AtomicBoolean(false);
     Thread t = new Thread()
@@ -493,6 +497,8 @@ public class GenericNodeTest
 
     in.connectInputPort("ip1", windowGenerator.acquireReservoir(String.valueOf(in.id), 1024));
     in.connectOutputPort("output", testSink);
+    in.firstWindowMillis = 0;
+    in.windowWidthMillis = 100;
 
     windowGenerator.activate(null);
 
@@ -551,9 +557,13 @@ public class GenericNodeTest
     final long sleepTime = 25L;
 
     WindowGenerator windowGenerator = new WindowGenerator(new ScheduledThreadPoolExecutor(1, "WindowGenerator"), 1024);
-    windowGenerator.setResetWindow(0L);
-    windowGenerator.setFirstWindow(1448909287863L);
-    windowGenerator.setWindowWidth(100);
+    long resetWindow = 0L;
+    long firstWindowMillis = 1448909287863L;
+    int windowWidth = 100;
+
+    windowGenerator.setResetWindow(resetWindow);
+    windowGenerator.setFirstWindow(firstWindowMillis);
+    windowGenerator.setWindowWidth(windowWidth);
     windowGenerator.setCheckpointCount(1, 0);
 
     GenericOperator go = new GenericOperator();
@@ -576,6 +586,8 @@ public class GenericNodeTest
 
     gn.connectInputPort("ip1", windowGenerator.acquireReservoir(String.valueOf(gn.id), 1024));
     gn.connectOutputPort("output", testSink);
+    gn.firstWindowMillis = firstWindowMillis;
+    gn.windowWidthMillis = windowWidth;
 
     windowGenerator.activate(null);
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/test/java/com/datatorrent/stram/engine/GenericTestOperator.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericTestOperator.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericTestOperator.java
index 0c8ae62..a3b0c53 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/GenericTestOperator.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericTestOperator.java
@@ -132,6 +132,9 @@ public class GenericTestOperator extends BaseOperator {
     if (outport1.isConnected()) {
       outport1.emit(o);
     }
+    if (outport2.isConnected()) {
+      outport2.emit(o);
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
new file mode 100644
index 0000000..359da17
--- /dev/null
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram.plan.logical;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.validation.ValidationException;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.common.util.DefaultDelayOperator;
+import com.datatorrent.stram.StramLocalCluster;
+import com.datatorrent.stram.engine.GenericTestOperator;
+import com.datatorrent.stram.engine.TestGeneratorInputOperator;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Unit tests for topologies with delay operator
+ */
+public class DelayOperatorTest
+{
+  private static Lock sequential = new ReentrantLock();
+
+  @Before
+  public void setup()
+  {
+    sequential.lock();
+  }
+
+  @After
+  public void teardown()
+  {
+    sequential.unlock();
+  }
+
+  @Test
+  public void testInvalidDelayDetection()
+  {
+    LogicalPlan dag = new LogicalPlan();
+
+    GenericTestOperator opB = dag.addOperator("B", GenericTestOperator.class);
+    GenericTestOperator opC = dag.addOperator("C", GenericTestOperator.class);
+    GenericTestOperator opD = dag.addOperator("D", GenericTestOperator.class);
+    DefaultDelayOperator opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
+
+    dag.addStream("BtoC", opB.outport1, opC.inport1);
+    dag.addStream("CtoD", opC.outport1, opD.inport1);
+    dag.addStream("CtoDelay", opC.outport2, opDelay.input);
+    dag.addStream("DelayToD", opDelay.output, opD.inport2);
+
+    List<List<String>> invalidDelays = new ArrayList<>();
+    dag.findInvalidDelays(dag.getMeta(opB), invalidDelays);
+    assertEquals("operator invalid delay", 1, invalidDelays.size());
+
+    try {
+      dag.validate();
+      fail("validation should fail");
+    } catch (ValidationException e) {
+      // expected
+    }
+
+    dag = new LogicalPlan();
+
+    opB = dag.addOperator("B", GenericTestOperator.class);
+    opC = dag.addOperator("C", GenericTestOperator.class);
+    opD = dag.addOperator("D", GenericTestOperator.class);
+    opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
+    dag.setAttribute(opDelay, Context.OperatorContext.APPLICATION_WINDOW_COUNT, 2);
+    dag.addStream("BtoC", opB.outport1, opC.inport1);
+    dag.addStream("CtoD", opC.outport1, opD.inport1);
+    dag.addStream("CtoDelay", opC.outport2, opDelay.input);
+    dag.addStream("DelayToC", opDelay.output, opC.inport2);
+
+    invalidDelays = new ArrayList<>();
+    dag.findInvalidDelays(dag.getMeta(opB), invalidDelays);
+    assertEquals("operator invalid delay", 1, invalidDelays.size());
+
+    try {
+      dag.validate();
+      fail("validation should fail");
+    } catch (ValidationException e) {
+      // expected
+    }
+
+    dag = new LogicalPlan();
+
+    opB = dag.addOperator("B", GenericTestOperator.class);
+    opC = dag.addOperator("C", GenericTestOperator.class);
+    opD = dag.addOperator("D", GenericTestOperator.class);
+    opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
+    dag.addStream("BtoC", opB.outport1, opC.inport1);
+    dag.addStream("CtoD", opC.outport1, opD.inport1);
+    dag.addStream("CtoDelay", opC.outport2, opDelay.input).setLocality(DAG.Locality.THREAD_LOCAL);
+    dag.addStream("DelayToC", opDelay.output, opC.inport2).setLocality(DAG.Locality.THREAD_LOCAL);
+
+    try {
+      dag.validate();
+      fail("validation should fail");
+    } catch (ValidationException e) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testValidDelay()
+  {
+    LogicalPlan dag = new LogicalPlan();
+
+    TestGeneratorInputOperator opA = dag.addOperator("A", TestGeneratorInputOperator.class);
+    GenericTestOperator opB = dag.addOperator("B", GenericTestOperator.class);
+    GenericTestOperator opC = dag.addOperator("C", GenericTestOperator.class);
+    GenericTestOperator opD = dag.addOperator("D", GenericTestOperator.class);
+    DefaultDelayOperator opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
+
+    dag.addStream("AtoB", opA.outport, opB.inport1);
+    dag.addStream("BtoC", opB.outport1, opC.inport1);
+    dag.addStream("CtoD", opC.outport1, opD.inport1);
+    dag.addStream("CtoDelay", opC.outport2, opDelay.input);
+    dag.addStream("DelayToB", opDelay.output, opB.inport2);
+    dag.validate();
+  }
+
+  public static final Long[] FIBONACCI_NUMBERS = new Long[]{
+      1L, 1L, 2L, 3L, 5L, 8L, 13L, 21L, 34L, 55L, 89L, 144L, 233L, 377L, 610L, 987L, 1597L, 2584L, 4181L, 6765L,
+      10946L, 17711L, 28657L, 46368L, 75025L, 121393L, 196418L, 317811L, 514229L, 832040L, 1346269L, 2178309L,
+      3524578L, 5702887L, 9227465L, 14930352L, 24157817L, 39088169L, 63245986L, 102334155L
+  };
+
+  public static class FibonacciOperator extends BaseOperator
+  {
+    public static List<Long> results = new ArrayList<>();
+    public long currentNumber = 1;
+    private transient long tempNum;
+
+    public transient DefaultInputPort<Object> dummyInputPort = new DefaultInputPort<Object>()
+    {
+      @Override
+      public void process(Object tuple)
+      {
+      }
+    };
+    public transient DefaultInputPort<Long> input = new DefaultInputPort<Long>()
+    {
+      @Override
+      public void process(Long tuple)
+      {
+        tempNum = tuple;
+      }
+    };
+    public transient DefaultOutputPort<Long> output = new DefaultOutputPort<>();
+
+    @Override
+    public void endWindow()
+    {
+      output.emit(currentNumber);
+      results.add(currentNumber);
+      currentNumber += tempNum;
+      if (currentNumber <= 0) {
+        // overflow
+        currentNumber = 1;
+      }
+    }
+
+  }
+
+  public static class FailableFibonacciOperator extends FibonacciOperator implements Operator.CheckpointListener
+  {
+    private boolean committed = false;
+    private int simulateFailureWindows = 0;
+    private boolean simulateFailureAfterCommit = false;
+    private int windowCount = 0;
+    public static boolean failureSimulated = false;
+
+    @Override
+    public void beginWindow(long windowId)
+    {
+      if (simulateFailureWindows > 0 && ((simulateFailureAfterCommit && committed) || !simulateFailureAfterCommit) &&
+          !failureSimulated) {
+        if (windowCount++ == simulateFailureWindows) {
+          failureSimulated = true;
+          throw new RuntimeException("simulating failure");
+        }
+      }
+    }
+
+    @Override
+    public void checkpointed(long windowId)
+    {
+    }
+
+    @Override
+    public void committed(long windowId)
+    {
+      committed = true;
+    }
+
+    public void setSimulateFailureWindows(int windows, boolean afterCommit)
+    {
+      this.simulateFailureAfterCommit = afterCommit;
+      this.simulateFailureWindows = windows;
+    }
+  }
+
+  public static class FailableDelayOperator extends DefaultDelayOperator implements Operator.CheckpointListener
+  {
+    private boolean committed = false;
+    private int simulateFailureWindows = 0;
+    private boolean simulateFailureAfterCommit = false;
+    private int windowCount = 0;
+    private static boolean failureSimulated = false;
+
+    @Override
+    public void beginWindow(long windowId)
+    {
+      super.beginWindow(windowId);
+      if (simulateFailureWindows > 0 && ((simulateFailureAfterCommit && committed) || !simulateFailureAfterCommit) &&
+          !failureSimulated) {
+        if (windowCount++ == simulateFailureWindows) {
+          failureSimulated = true;
+          throw new RuntimeException("simulating failure");
+        }
+      }
+    }
+
+    @Override
+    public void checkpointed(long windowId)
+    {
+    }
+
+    @Override
+    public void committed(long windowId)
+    {
+      committed = true;
+    }
+
+    public void setSimulateFailureWindows(int windows, boolean afterCommit)
+    {
+      this.simulateFailureAfterCommit = afterCommit;
+      this.simulateFailureWindows = windows;
+    }
+  }
+
+
+  @Test
+  public void testFibonacci() throws Exception
+  {
+    LogicalPlan dag = new LogicalPlan();
+
+    TestGeneratorInputOperator dummyInput = dag.addOperator("DUMMY", TestGeneratorInputOperator.class);
+    FibonacciOperator fib = dag.addOperator("FIB", FibonacciOperator.class);
+    DefaultDelayOperator opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
+
+    dag.addStream("dummy_to_operator", dummyInput.outport, fib.dummyInputPort);
+    dag.addStream("operator_to_delay", fib.output, opDelay.input);
+    dag.addStream("delay_to_operator", opDelay.output, fib.input);
+    FibonacciOperator.results.clear();
+    final StramLocalCluster localCluster = new StramLocalCluster(dag);
+    localCluster.setExitCondition(new Callable<Boolean>()
+    {
+      @Override
+      public Boolean call() throws Exception
+      {
+        return FibonacciOperator.results.size() >= 10;
+      }
+    });
+    localCluster.run(10000);
+    Assert.assertArrayEquals(Arrays.copyOfRange(FIBONACCI_NUMBERS, 0, 10),
+        FibonacciOperator.results.subList(0, 10).toArray());
+  }
+
+  @Ignore // Out of sequence BEGIN_WINDOW tuple on Travis. Will tackle in the next version
+  @Test
+  public void testFibonacciRecovery1() throws Exception
+  {
+    LogicalPlan dag = new LogicalPlan();
+
+    TestGeneratorInputOperator dummyInput = dag.addOperator("DUMMY", TestGeneratorInputOperator.class);
+    FailableFibonacciOperator fib = dag.addOperator("FIB", FailableFibonacciOperator.class);
+    DefaultDelayOperator opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
+
+    fib.setSimulateFailureWindows(3, true);
+
+    dag.addStream("dummy_to_operator", dummyInput.outport, fib.dummyInputPort);
+    dag.addStream("operator_to_delay", fib.output, opDelay.input);
+    dag.addStream("delay_to_operator", opDelay.output, fib.input);
+    dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
+    dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
+    FailableFibonacciOperator.results.clear();
+    FailableFibonacciOperator.failureSimulated = false;
+    final StramLocalCluster localCluster = new StramLocalCluster(dag);
+    localCluster.setPerContainerBufferServer(true);
+    localCluster.setExitCondition(new Callable<Boolean>()
+    {
+      @Override
+      public Boolean call() throws Exception
+      {
+        return FailableFibonacciOperator.results.size() >= 30;
+      }
+    });
+    localCluster.run(60000);
+    Assert.assertTrue("failure should be invoked", FailableFibonacciOperator.failureSimulated);
+    Assert.assertArrayEquals(Arrays.copyOfRange(new TreeSet<>(Arrays.asList(FIBONACCI_NUMBERS)).toArray(), 0, 20),
+        Arrays.copyOfRange(new TreeSet<>(FibonacciOperator.results).toArray(), 0, 20));
+  }
+
+  @Ignore // Out of sequence BEGIN_WINDOW tuple on Travis. Will tackle in the next version
+  @Test
+  public void testFibonacciRecovery2() throws Exception
+  {
+    LogicalPlan dag = new LogicalPlan();
+
+    TestGeneratorInputOperator dummyInput = dag.addOperator("DUMMY", TestGeneratorInputOperator.class);
+    FibonacciOperator fib = dag.addOperator("FIB", FibonacciOperator.class);
+    FailableDelayOperator opDelay = dag.addOperator("opDelay", FailableDelayOperator.class);
+
+    opDelay.setSimulateFailureWindows(5, true);
+
+    dag.addStream("dummy_to_operator", dummyInput.outport, fib.dummyInputPort);
+    dag.addStream("operator_to_delay", fib.output, opDelay.input);
+    dag.addStream("delay_to_operator", opDelay.output, fib.input);
+    dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
+    dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
+    FibonacciOperator.results.clear();
+    FailableDelayOperator.failureSimulated = false;
+    final StramLocalCluster localCluster = new StramLocalCluster(dag);
+    localCluster.setPerContainerBufferServer(true);
+    localCluster.setExitCondition(new Callable<Boolean>()
+    {
+      @Override
+      public Boolean call() throws Exception
+      {
+        return FibonacciOperator.results.size() >= 30;
+      }
+    });
+    localCluster.run(60000);
+
+    Assert.assertTrue("failure should be invoked", FailableDelayOperator.failureSimulated);
+    Assert.assertArrayEquals(Arrays.copyOfRange(new TreeSet<>(Arrays.asList(FIBONACCI_NUMBERS)).toArray(), 0, 20),
+        Arrays.copyOfRange(new TreeSet<>(FibonacciOperator.results).toArray(), 0, 20));
+  }
+
+
+}


[3/3] incubator-apex-core git commit: APEXCORE-306 Update checkpoints for strongly connected operators as group.

Posted by th...@apache.org.
APEXCORE-306 Update checkpoints for strongly connected operators as group.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/b3402be5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/b3402be5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/b3402be5

Branch: refs/heads/devel-3
Commit: b3402be5a45728515f4a8328fec5a76ddede0350
Parents: 4d5828c
Author: Thomas Weise <th...@datatorrent.com>
Authored: Thu Jan 21 16:39:55 2016 -0800
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Jan 22 19:24:31 2016 -0800

----------------------------------------------------------------------
 .../stram/StreamingContainerManager.java        | 161 +++++++++++++------
 .../com/datatorrent/stram/api/Checkpoint.java   |  11 ++
 .../stram/plan/logical/LogicalPlan.java         |  91 +++++++----
 .../com/datatorrent/stram/CheckpointTest.java   |   3 +-
 .../stram/plan/logical/DelayOperatorTest.java   |  88 +++++++++-
 .../stram/plan/logical/LogicalPlanTest.java     | 131 ++++++++++-----
 6 files changed, 358 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b3402be5/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 6233697..a687a37 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -35,6 +35,7 @@ import com.esotericsoftware.kryo.KryoException;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
 import com.google.common.base.Predicate;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
@@ -159,6 +160,7 @@ public class StreamingContainerManager implements PlanContext
   private long lastResourceRequest = 0;
   private final Map<String, StreamingContainerAgent> containers = new ConcurrentHashMap<String, StreamingContainerAgent>();
   private final List<Pair<PTOperator, Long>> purgeCheckpoints = new ArrayList<Pair<PTOperator, Long>>();
+  private Map<OperatorMeta, Set<OperatorMeta>> checkpointGroups;
   private final Map<Long, Set<PTOperator>> shutdownOperators = new HashMap<>();
   private CriticalPathInfo criticalPathInfo;
   private final ConcurrentMap<PTOperator, PTOperator> reportStats = Maps.newConcurrentMap();
@@ -812,6 +814,7 @@ public class StreamingContainerManager implements PlanContext
     Collection<OperatorMeta> logicalOperators = getLogicalPlan().getAllOperators();
     //for backward compatibility
     for (OperatorMeta operatorMeta : logicalOperators) {
+      @SuppressWarnings("deprecation")
       Context.CountersAggregator aggregator = operatorMeta.getValue(OperatorContext.COUNTERS_AGGREGATOR);
       if (aggregator == null) {
         continue;
@@ -825,6 +828,7 @@ public class StreamingContainerManager implements PlanContext
         }
       }
       if (counters.size() > 0) {
+        @SuppressWarnings("deprecation")
         Object aggregate = aggregator.aggregate(counters);
         latestLogicalCounters.put(operatorMeta.getName(), aggregate);
       }
@@ -857,6 +861,8 @@ public class StreamingContainerManager implements PlanContext
         if (windowMetrics == null) {
           windowMetrics = new LinkedBlockingQueue<Pair<Long, Map<String, Object>>>(METRIC_QUEUE_SIZE)
           {
+            private static final long serialVersionUID = 1L;
+
             @Override
             public boolean add(Pair<Long, Map<String, Object>> longMapPair)
             {
@@ -1134,7 +1140,7 @@ public class StreamingContainerManager implements PlanContext
     cs.container.setAllocatedVCores(0);
 
     // resolve dependencies
-    UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock);
+    UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock, false, getCheckpointGroups());
     for (PTOperator oper : cs.container.getOperators()) {
       updateRecoveryCheckpoints(oper, ctx);
     }
@@ -1881,31 +1887,18 @@ public class StreamingContainerManager implements PlanContext
     public final Set<PTOperator> blocked = new LinkedHashSet<PTOperator>();
     public final long currentTms;
     public final boolean recovery;
+    public final Map<OperatorMeta, Set<OperatorMeta>> checkpointGroups;
 
     public UpdateCheckpointsContext(Clock clock)
     {
-      this.currentTms = clock.getTime();
-      this.recovery = false;
+      this(clock, false, Collections.<OperatorMeta, Set<OperatorMeta>>emptyMap());
     }
 
-    public UpdateCheckpointsContext(Clock clock, boolean recovery)
+    public UpdateCheckpointsContext(Clock clock, boolean recovery, Map<OperatorMeta, Set<OperatorMeta>> checkpointGroups)
     {
       this.currentTms = clock.getTime();
       this.recovery = recovery;
-    }
-
-  }
-
-  private void addVisited(PTOperator operator, UpdateCheckpointsContext ctx)
-  {
-    ctx.visited.add(operator);
-    for (PTOperator.PTOutput out : operator.getOutputs()) {
-      for (PTOperator.PTInput sink : out.sinks) {
-        PTOperator sinkOperator = sink.target;
-        if (!ctx.visited.contains(sinkOperator)) {
-          addVisited(sinkOperator, ctx);
-        }
-      }
+      this.checkpointGroups = checkpointGroups;
     }
   }
 
@@ -1933,20 +1926,55 @@ public class StreamingContainerManager implements PlanContext
       }
     }
 
-    long maxCheckpoint = operator.getRecentCheckpoint().windowId;
-    if (ctx.recovery && maxCheckpoint == Stateless.WINDOW_ID && operator.isOperatorStateLess()) {
-      long currentWindowId = WindowGenerator.getWindowId(ctx.currentTms, this.vars.windowStartMillis, this.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS));
-      maxCheckpoint = currentWindowId;
+    // the most recent checkpoint eligible for recovery based on downstream state
+    Checkpoint maxCheckpoint = Checkpoint.INITIAL_CHECKPOINT;
+
+    Set<OperatorMeta> checkpointGroup = ctx.checkpointGroups.get(operator.getOperatorMeta());
+    if (checkpointGroup == null) {
+      checkpointGroup = Collections.singleton(operator.getOperatorMeta());
+    }
+    // find intersection of checkpoints that group can collectively move to
+    TreeSet<Checkpoint> commonCheckpoints = new TreeSet<>(new Checkpoint.CheckpointComparator());
+    synchronized (operator.checkpoints) {
+      commonCheckpoints.addAll(operator.checkpoints);
+    }
+    Set<PTOperator> groupOpers = new HashSet<>(checkpointGroup.size());
+    if (checkpointGroup.size() > 1) {
+      for (OperatorMeta om : checkpointGroup) {
+        Collection<PTOperator> operators = plan.getAllOperators(om);
+        for (PTOperator groupOper : operators) {
+          synchronized (groupOper.checkpoints) {
+            commonCheckpoints.retainAll(groupOper.checkpoints);
+          }
+          // visit all downstream operators of the group
+          ctx.visited.add(groupOper);
+          groupOpers.add(groupOper);
+        }
+      }
+      // highest common checkpoint
+      if (!commonCheckpoints.isEmpty()) {
+        maxCheckpoint = commonCheckpoints.last();
+      }
+    } else {
+      // without logical grouping, treat partitions as independent
+      // this is especially important for parallel partitioning
+      ctx.visited.add(operator);
+      groupOpers.add(operator);
+      maxCheckpoint = operator.getRecentCheckpoint();
+      if (ctx.recovery && maxCheckpoint.windowId == Stateless.WINDOW_ID && operator.isOperatorStateLess()) {
+        long currentWindowId = WindowGenerator.getWindowId(ctx.currentTms, this.vars.windowStartMillis, this.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS));
+        maxCheckpoint = new Checkpoint(currentWindowId, 0, 0);
+      }
     }
-    ctx.visited.add(operator);
 
     // DFS downstream operators
-    if (operator.getOperatorMeta().getOperator() instanceof Operator.DelayOperator) {
-      addVisited(operator, ctx);
-    } else {
-      for (PTOperator.PTOutput out : operator.getOutputs()) {
+    for (PTOperator groupOper : groupOpers) {
+      for (PTOperator.PTOutput out : groupOper.getOutputs()) {
         for (PTOperator.PTInput sink : out.sinks) {
           PTOperator sinkOperator = sink.target;
+          if (groupOpers.contains(sinkOperator)) {
+            continue; // downstream operator within group
+          }
           if (!ctx.visited.contains(sinkOperator)) {
             // downstream traversal
             updateRecoveryCheckpoints(sinkOperator, ctx);
@@ -1954,7 +1982,7 @@ public class StreamingContainerManager implements PlanContext
           // recovery window id cannot move backwards
           // when dynamically adding new operators
           if (sinkOperator.getRecoveryCheckpoint().windowId >= operator.getRecoveryCheckpoint().windowId) {
-            maxCheckpoint = Math.min(maxCheckpoint, sinkOperator.getRecoveryCheckpoint().windowId);
+            maxCheckpoint = Checkpoint.min(maxCheckpoint, sinkOperator.getRecoveryCheckpoint());
           }
 
           if (ctx.blocked.contains(sinkOperator)) {
@@ -1967,33 +1995,43 @@ public class StreamingContainerManager implements PlanContext
       }
     }
 
-    // checkpoint frozen during deployment
-    if (ctx.recovery || operator.getState() != PTOperator.State.PENDING_DEPLOY) {
-      // remove previous checkpoints
-      Checkpoint c1 = Checkpoint.INITIAL_CHECKPOINT;
-      synchronized (operator.checkpoints) {
-        if (!operator.checkpoints.isEmpty() && (operator.checkpoints.getFirst()).windowId <= maxCheckpoint) {
-          c1 = operator.checkpoints.getFirst();
-          Checkpoint c2;
-          while (operator.checkpoints.size() > 1 && ((c2 = operator.checkpoints.get(1)).windowId) <= maxCheckpoint) {
-            operator.checkpoints.removeFirst();
-            //LOG.debug("Checkpoint to delete: operator={} windowId={}", operator.getName(), c1);
-            this.purgeCheckpoints.add(new Pair<PTOperator, Long>(operator, c1.windowId));
-            c1 = c2;
+    // find the common checkpoint that is <= downstream recovery checkpoint
+    if (!commonCheckpoints.contains(maxCheckpoint)) {
+      if (!commonCheckpoints.isEmpty()) {
+        maxCheckpoint = Objects.firstNonNull(commonCheckpoints.floor(maxCheckpoint), maxCheckpoint);
+      }
+    }
+
+    for (PTOperator groupOper : groupOpers) {
+      // checkpoint frozen during deployment
+      if (ctx.recovery || groupOper.getState() != PTOperator.State.PENDING_DEPLOY) {
+        // remove previous checkpoints
+        Checkpoint c1 = Checkpoint.INITIAL_CHECKPOINT;
+        LinkedList<Checkpoint> checkpoints = groupOper.checkpoints;
+        synchronized (checkpoints) {
+          if (!checkpoints.isEmpty() && (checkpoints.getFirst()).windowId <= maxCheckpoint.windowId) {
+            c1 = checkpoints.getFirst();
+            Checkpoint c2;
+            while (checkpoints.size() > 1 && ((c2 = checkpoints.get(1)).windowId) <= maxCheckpoint.windowId) {
+              checkpoints.removeFirst();
+              //LOG.debug("Checkpoint to delete: operator={} windowId={}", operator.getName(), c1);
+              this.purgeCheckpoints.add(new Pair<PTOperator, Long>(groupOper, c1.windowId));
+              c1 = c2;
+            }
           }
-        }
-        else {
-          if (ctx.recovery && operator.checkpoints.isEmpty() && operator.isOperatorStateLess()) {
-            LOG.debug("Adding checkpoint for stateless operator {} {}", operator, Codec.getStringWindowId(maxCheckpoint));
-            c1 = operator.addCheckpoint(maxCheckpoint, this.vars.windowStartMillis);
+          else {
+            if (ctx.recovery && checkpoints.isEmpty() && groupOper.isOperatorStateLess()) {
+              LOG.debug("Adding checkpoint for stateless operator {} {}", groupOper, Codec.getStringWindowId(maxCheckpoint.windowId));
+              c1 = groupOper.addCheckpoint(maxCheckpoint.windowId, this.vars.windowStartMillis);
+            }
           }
         }
+        //LOG.debug("Operator {} checkpoints: commit {} recent {}", new Object[] {operator.getName(), c1, operator.checkpoints});
+        groupOper.setRecoveryCheckpoint(c1);
+      }
+      else {
+        LOG.debug("Skipping checkpoint update {} during {}", groupOper, groupOper.getState());
       }
-      //LOG.debug("Operator {} checkpoints: commit {} recent {}", new Object[] {operator.getName(), c1, operator.checkpoints});
-      operator.setRecoveryCheckpoint(c1);
-    }
-    else {
-      LOG.debug("Skipping checkpoint update {} during {}", operator, operator.getState());
     }
 
   }
@@ -2009,13 +2047,32 @@ public class StreamingContainerManager implements PlanContext
     return this.vars.windowStartMillis;
   }
 
+  private Map<OperatorMeta, Set<OperatorMeta>> getCheckpointGroups()
+  {
+    if (this.checkpointGroups == null) {
+      this.checkpointGroups = new HashMap<>();
+      LogicalPlan dag = this.plan.getLogicalPlan();
+      dag.resetNIndex();
+      LogicalPlan.ValidationContext vc = new LogicalPlan.ValidationContext();
+      for (OperatorMeta om : dag.getRootOperators()) {
+        this.plan.getLogicalPlan().findStronglyConnected(om, vc);
+      }
+      for (Set<OperatorMeta> checkpointGroup : vc.stronglyConnected) {
+        for (OperatorMeta om : checkpointGroup) {
+          this.checkpointGroups.put(om, checkpointGroup);
+        }
+      }
+    }
+    return checkpointGroups;
+  }
+
   /**
    * Visit all operators to update current checkpoint based on updated downstream state.
    * Purge older checkpoints that are no longer needed.
    */
   private long updateCheckpoints(boolean recovery)
   {
-    UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock, recovery);
+    UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock, recovery, getCheckpointGroups());
     for (OperatorMeta logicalOperator : plan.getLogicalPlan().getRootOperators()) {
       //LOG.debug("Updating checkpoints for operator {}", logicalOperator.getName());
       List<PTOperator> operators = plan.getOperators(logicalOperator);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b3402be5/engine/src/main/java/com/datatorrent/stram/api/Checkpoint.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/api/Checkpoint.java b/engine/src/main/java/com/datatorrent/stram/api/Checkpoint.java
index 5ec4a7e..d24b17a 100644
--- a/engine/src/main/java/com/datatorrent/stram/api/Checkpoint.java
+++ b/engine/src/main/java/com/datatorrent/stram/api/Checkpoint.java
@@ -18,6 +18,8 @@
  */
 package com.datatorrent.stram.api;
 
+import java.util.Comparator;
+
 import com.datatorrent.api.annotation.Stateless;
 import com.datatorrent.bufferserver.util.Codec;
 
@@ -102,6 +104,15 @@ public class Checkpoint implements com.datatorrent.api.Stats.Checkpoint
     return windowId;
   }
 
+  public static class CheckpointComparator implements Comparator<Checkpoint>
+  {
+    @Override
+    public int compare(Checkpoint o1, Checkpoint o2)
+    {
+      return Long.compare(o1.windowId, o2.windowId);
+    }
+  }
+
   @SuppressWarnings("FieldNameHidesFieldInSuperclass")
   public static final Checkpoint INITIAL_CHECKPOINT = new Checkpoint(Stateless.WINDOW_ID, 0, 0);
   private static final long serialVersionUID = 201402152116L;

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b3402be5/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 883ad71..6d7ebe1 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -159,8 +159,6 @@ public class LogicalPlan implements Serializable, DAG
   public final Map<String, ModuleMeta> modules = new LinkedHashMap<>();
   private final List<OperatorMeta> rootOperators = new ArrayList<OperatorMeta>();
   private final Attribute.AttributeMap attributes = new DefaultAttributeMap();
-  private transient int nodeIndex = 0; // used for cycle validation
-  private transient Stack<OperatorMeta> stack = new Stack<OperatorMeta>(); // used for cycle validation
   private transient Map<String, ArrayListMultimap<OutputPort<?>, InputPort<?>>> streamLinks = new HashMap<>();
 
   @Override
@@ -1540,6 +1538,7 @@ public class LogicalPlan implements Serializable, DAG
     return this.operators.get(operatorName);
   }
 
+  @Override
   public ModuleMeta getModuleMeta(String moduleName)
   {
     return this.modules.get(moduleName);
@@ -1557,6 +1556,7 @@ public class LogicalPlan implements Serializable, DAG
     throw new IllegalArgumentException("Operator not associated with the DAG: " + operator);
   }
 
+  @Override
   public ModuleMeta getMeta(Module module)
   {
     for (ModuleMeta m : getAllModules()) {
@@ -1626,6 +1626,24 @@ public class LogicalPlan implements Serializable, DAG
     return classNames;
   }
 
+  public static class ValidationContext
+  {
+    public int nodeIndex = 0;
+    public Stack<OperatorMeta> stack = new Stack<OperatorMeta>();
+    public Stack<OperatorMeta> path = new Stack<OperatorMeta>();
+    public List<Set<OperatorMeta>> stronglyConnected = new ArrayList<>();
+    public OperatorMeta invalidLoopAt;
+    public List<Set<OperatorMeta>> invalidCycles = new ArrayList<>();
+  }
+
+  public void resetNIndex()
+  {
+    for (OperatorMeta om : getAllOperators()) {
+      om.lowlink = null;
+      om.nindex = null;
+    }
+  }
+
   /**
    * Validate the plan. Includes checks that required ports are connected,
    * required configuration parameters specified, graph free of cycles etc.
@@ -1752,21 +1770,20 @@ public class LogicalPlan implements Serializable, DAG
         throw new ValidationException("At least one output port must be connected: " + n.name);
       }
     }
-    stack = new Stack<OperatorMeta>();
 
-    List<List<String>> cycles = new ArrayList<List<String>>();
+    ValidationContext validatonContext = new ValidationContext();
     for (OperatorMeta n: operators.values()) {
       if (n.nindex == null) {
-        findStronglyConnected(n, cycles);
+        findStronglyConnected(n, validatonContext);
       }
     }
-    if (!cycles.isEmpty()) {
-      throw new ValidationException("Loops in graph: " + cycles);
+    if (!validatonContext.invalidCycles.isEmpty()) {
+      throw new ValidationException("Loops in graph: " + validatonContext.invalidCycles);
     }
 
     List<List<String>> invalidDelays = new ArrayList<>();
     for (OperatorMeta n : rootOperators) {
-      findInvalidDelays(n, invalidDelays);
+      findInvalidDelays(n, invalidDelays, new Stack<OperatorMeta>());
     }
     if (!invalidDelays.isEmpty()) {
       throw new ValidationException("Invalid delays in graph: " + invalidDelays);
@@ -1908,59 +1925,72 @@ public class LogicalPlan implements Serializable, DAG
    * @param om
    * @param cycles
    */
-  public void findStronglyConnected(OperatorMeta om, List<List<String>> cycles)
+  public void findStronglyConnected(OperatorMeta om, ValidationContext ctx)
   {
-    om.nindex = nodeIndex;
-    om.lowlink = nodeIndex;
-    nodeIndex++;
-    stack.push(om);
+    om.nindex = ctx.nodeIndex;
+    om.lowlink = ctx.nodeIndex;
+    ctx.nodeIndex++;
+    ctx.stack.push(om);
+    ctx.path.push(om);
 
     // depth first successors traversal
     for (StreamMeta downStream: om.outputStreams.values()) {
       for (InputPortMeta sink: downStream.sinks) {
-        if (om.getOperator() instanceof Operator.DelayOperator) {
-          // this is an iteration loop, do not treat it as downstream when detecting cycles
-          sink.attributes.put(IS_CONNECTED_TO_DELAY_OPERATOR, true);
-          continue;
-        }
         OperatorMeta successor = sink.getOperatorWrapper();
         if (successor == null) {
           continue;
         }
         // check for self referencing node
         if (om == successor) {
-          cycles.add(Collections.singletonList(om.name));
+          ctx.invalidCycles.add(Collections.singleton(om));
         }
         if (successor.nindex == null) {
           // not visited yet
-          findStronglyConnected(successor, cycles);
+          findStronglyConnected(successor, ctx);
           om.lowlink = Math.min(om.lowlink, successor.lowlink);
         }
-        else if (stack.contains(successor)) {
+        else if (ctx.stack.contains(successor)) {
           om.lowlink = Math.min(om.lowlink, successor.nindex);
+          boolean isDelayLoop = false;
+          for (int i=ctx.path.size(); i>0; i--) {
+            OperatorMeta om2 = ctx.path.get(i-1);
+            if (om2.getOperator() instanceof Operator.DelayOperator) {
+              isDelayLoop = true;
+            }
+            if (om2 == successor) {
+              break;
+            }
+          }
+          if (!isDelayLoop) {
+            ctx.invalidLoopAt = successor;
+          }
         }
       }
     }
 
     // pop stack for all root operators
     if (om.lowlink.equals(om.nindex)) {
-      List<String> connectedIds = new ArrayList<String>();
-      while (!stack.isEmpty()) {
-        OperatorMeta n2 = stack.pop();
-        connectedIds.add(n2.name);
+      Set<OperatorMeta> connectedSet = new LinkedHashSet<>(ctx.stack.size());
+      while (!ctx.stack.isEmpty()) {
+        OperatorMeta n2 = ctx.stack.pop();
+        connectedSet.add(n2);
         if (n2 == om) {
           break; // collected all connected operators
         }
       }
       // strongly connected (cycle) if more than one node in stack
-      if (connectedIds.size() > 1) {
-        LOG.debug("detected cycle from node {}: {}", om.name, connectedIds);
-        cycles.add(connectedIds);
+      if (connectedSet.size() > 1) {
+        ctx.stronglyConnected.add(connectedSet);
+        if (connectedSet.contains(ctx.invalidLoopAt)) {
+          ctx.invalidCycles.add(connectedSet);
+        }
       }
     }
+    ctx.path.pop();
+
   }
 
-  public void findInvalidDelays(OperatorMeta om, List<List<String>> invalidDelays)
+  public void findInvalidDelays(OperatorMeta om, List<List<String>> invalidDelays, Stack<OperatorMeta> stack)
   {
     stack.push(om);
 
@@ -1977,6 +2007,7 @@ public class LogicalPlan implements Serializable, DAG
       for (InputPortMeta sink : downStream.sinks) {
         OperatorMeta successor = sink.getOperatorWrapper();
         if (isDelayOperator) {
+          sink.attributes.put(IS_CONNECTED_TO_DELAY_OPERATOR, true);
           // Check whether all downstream operators are already visited in the path
           if (successor != null && !stack.contains(successor)) {
             LOG.debug("detected DelayOperator does not immediately output to a visited operator {}.{}->{}.{}",
@@ -1984,7 +2015,7 @@ public class LogicalPlan implements Serializable, DAG
             invalidDelays.add(Arrays.asList(om.getName(), successor.getName()));
           }
         } else {
-          findInvalidDelays(successor, invalidDelays);
+          findInvalidDelays(successor, invalidDelays, stack);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b3402be5/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
index ee3cbc3..5675b53 100644
--- a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
@@ -56,6 +56,7 @@ import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHea
 import com.datatorrent.stram.engine.GenericTestOperator;
 import com.datatorrent.stram.engine.OperatorContext;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
 import com.datatorrent.stram.plan.physical.PTContainer;
 import com.datatorrent.stram.plan.physical.PTOperator;
 import com.datatorrent.stram.plan.physical.PhysicalPlan;
@@ -314,7 +315,7 @@ public class CheckpointTest
     o4p1.checkpoints.add(leafCheckpoint);
 
     UpdateCheckpointsContext ctx;
-    dnm.updateRecoveryCheckpoints(o1p1, ctx = new UpdateCheckpointsContext(clock, true));
+    dnm.updateRecoveryCheckpoints(o1p1, ctx = new UpdateCheckpointsContext(clock, true, Collections.<OperatorMeta, Set<OperatorMeta>>emptyMap()));
     Assert.assertEquals("initial checkpoint " + o1p1, Checkpoint.INITIAL_CHECKPOINT, o1p1.getRecoveryCheckpoint());
     Assert.assertEquals("initial checkpoint " + o2SLp1, leafCheckpoint, o2SLp1.getRecoveryCheckpoint());
     Assert.assertEquals("initial checkpoint " + o3SLp1, new Checkpoint(clock.getTime(), 0, 0), o3SLp1.getRecoveryCheckpoint());

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b3402be5/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
index 359da17..06f184f 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
@@ -20,7 +20,11 @@ package com.datatorrent.stram.plan.logical;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
 import java.util.TreeSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.locks.Lock;
@@ -32,8 +36,14 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
 
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
+
+import com.google.common.collect.Sets;
+
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DefaultInputPort;
@@ -42,8 +52,17 @@ import com.datatorrent.api.Operator;
 import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.common.util.DefaultDelayOperator;
 import com.datatorrent.stram.StramLocalCluster;
+import com.datatorrent.stram.StreamingContainerManager;
+import com.datatorrent.stram.StreamingContainerManager.UpdateCheckpointsContext;
+import com.datatorrent.stram.api.Checkpoint;
 import com.datatorrent.stram.engine.GenericTestOperator;
 import com.datatorrent.stram.engine.TestGeneratorInputOperator;
+import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
+import com.datatorrent.stram.plan.physical.PTOperator;
+import com.datatorrent.stram.plan.physical.PhysicalPlan;
+import com.datatorrent.stram.support.StramTestSupport;
+import com.datatorrent.stram.support.StramTestSupport.MemoryStorageAgent;
+import com.datatorrent.stram.support.StramTestSupport.TestMeta;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -75,7 +94,7 @@ public class DelayOperatorTest
     GenericTestOperator opB = dag.addOperator("B", GenericTestOperator.class);
     GenericTestOperator opC = dag.addOperator("C", GenericTestOperator.class);
     GenericTestOperator opD = dag.addOperator("D", GenericTestOperator.class);
-    DefaultDelayOperator opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
+    DefaultDelayOperator<Object> opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
 
     dag.addStream("BtoC", opB.outport1, opC.inport1);
     dag.addStream("CtoD", opC.outport1, opD.inport1);
@@ -83,7 +102,7 @@ public class DelayOperatorTest
     dag.addStream("DelayToD", opDelay.output, opD.inport2);
 
     List<List<String>> invalidDelays = new ArrayList<>();
-    dag.findInvalidDelays(dag.getMeta(opB), invalidDelays);
+    dag.findInvalidDelays(dag.getMeta(opB), invalidDelays, new Stack<OperatorMeta>());
     assertEquals("operator invalid delay", 1, invalidDelays.size());
 
     try {
@@ -106,7 +125,7 @@ public class DelayOperatorTest
     dag.addStream("DelayToC", opDelay.output, opC.inport2);
 
     invalidDelays = new ArrayList<>();
-    dag.findInvalidDelays(dag.getMeta(opB), invalidDelays);
+    dag.findInvalidDelays(dag.getMeta(opB), invalidDelays, new Stack<OperatorMeta>());
     assertEquals("operator invalid delay", 1, invalidDelays.size());
 
     try {
@@ -373,5 +392,68 @@ public class DelayOperatorTest
         Arrays.copyOfRange(new TreeSet<>(FibonacciOperator.results).toArray(), 0, 20));
   }
 
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testCheckpointUpdate()
+  {
+    LogicalPlan dag = StramTestSupport.createDAG(testMeta);
+
+    TestGeneratorInputOperator opA = dag.addOperator("A", TestGeneratorInputOperator.class);
+    GenericTestOperator opB = dag.addOperator("B", GenericTestOperator.class);
+    GenericTestOperator opC = dag.addOperator("C", GenericTestOperator.class);
+    GenericTestOperator opD = dag.addOperator("D", GenericTestOperator.class);
+    DefaultDelayOperator<Object> opDelay = dag.addOperator("opDelay", new DefaultDelayOperator<>());
+
+    dag.addStream("AtoB", opA.outport, opB.inport1);
+    dag.addStream("BtoC", opB.outport1, opC.inport1);
+    dag.addStream("CtoD", opC.outport1, opD.inport1);
+    dag.addStream("CtoDelay", opC.outport2, opDelay.input);
+    dag.addStream("DelayToB", opDelay.output, opB.inport2);
+    dag.validate();
+
+    dag.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
+    StreamingContainerManager scm = new StreamingContainerManager(dag);
+    PhysicalPlan plan = scm.getPhysicalPlan();
+    // set all operators as active to enable recovery window id update
+    for (PTOperator oper : plan.getAllOperators().values()) {
+      oper.setState(PTOperator.State.ACTIVE);
+    }
+
+    Clock clock = new SystemClock();
+
+    PTOperator opA1 = plan.getOperators(dag.getMeta(opA)).get(0);
+    PTOperator opB1 = plan.getOperators(dag.getMeta(opB)).get(0);
+    PTOperator opC1 = plan.getOperators(dag.getMeta(opC)).get(0);
+    PTOperator opDelay1 = plan.getOperators(dag.getMeta(opDelay)).get(0);
+    PTOperator opD1 = plan.getOperators(dag.getMeta(opD)).get(0);
+
+    Checkpoint cp3 = new Checkpoint(3L, 0, 0);
+    Checkpoint cp5 = new Checkpoint(5L, 0, 0);
+    Checkpoint cp4 = new Checkpoint(4L, 0, 0);
+
+    opB1.checkpoints.add(cp3);
+    opC1.checkpoints.add(cp3);
+    opC1.checkpoints.add(cp4);
+    opDelay1.checkpoints.add(cp3);
+    opDelay1.checkpoints.add(cp5);
+    opD1.checkpoints.add(cp5);
+    // construct grouping that would be supplied through LogicalPlan
+    Set<OperatorMeta> stronglyConnected = Sets.newHashSet(dag.getMeta(opB), dag.getMeta(opC), dag.getMeta(opDelay));
+    Map<OperatorMeta, Set<OperatorMeta>> groups = new HashMap<>();
+    for (OperatorMeta om : stronglyConnected) {
+      groups.put(om, stronglyConnected);
+    }
+
+    UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock, false, groups);
+    scm.updateRecoveryCheckpoints(opB1, ctx);
+
+    Assert.assertEquals("checkpoint " + opA1, Checkpoint.INITIAL_CHECKPOINT, opA1.getRecoveryCheckpoint());
+    Assert.assertEquals("checkpoint " + opB1, cp3, opC1.getRecoveryCheckpoint());
+    Assert.assertEquals("checkpoint " + opC1, cp3, opC1.getRecoveryCheckpoint());
+    Assert.assertEquals("checkpoint " + opD1, cp5, opD1.getRecoveryCheckpoint());
+
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b3402be5/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
index a4ac488..9383f12 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
@@ -19,6 +19,7 @@
 package com.datatorrent.stram.plan.logical;
 
 import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.common.util.DefaultDelayOperator;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -35,6 +36,7 @@ import javax.validation.constraints.Pattern;
 import com.esotericsoftware.kryo.DefaultSerializer;
 import com.esotericsoftware.kryo.serializers.JavaSerializer;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -43,7 +45,6 @@ import static org.junit.Assert.*;
 
 import com.datatorrent.common.partitioner.StatelessPartitioner;
 import com.datatorrent.api.*;
-import com.datatorrent.api.Context.DAGContext;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.DAG.Locality;
@@ -61,10 +62,12 @@ import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta;
 import com.datatorrent.stram.support.StramTestSupport.MemoryStorageAgent;
 import com.datatorrent.stram.support.StramTestSupport.RegexMatcher;
 
-public class LogicalPlanTest {
+public class LogicalPlanTest
+{
 
   @Test
-  public void testCycleDetection() {
+  public void testCycleDetection()
+  {
      LogicalPlan dag = new LogicalPlan();
 
      //NodeConf operator1 = b.getOrAddNode("operator1");
@@ -91,20 +94,20 @@ public class LogicalPlanTest {
        // expected, stream can have single input/output only
      }
 
-     List<List<String>> cycles = new ArrayList<List<String>>();
-     dag.findStronglyConnected(dag.getMeta(operator7), cycles);
-     assertEquals("operator self reference", 1, cycles.size());
-     assertEquals("operator self reference", 1, cycles.get(0).size());
-     assertEquals("operator self reference", dag.getMeta(operator7).getName(), cycles.get(0).get(0));
+     LogicalPlan.ValidationContext vc = new LogicalPlan.ValidationContext();
+     dag.findStronglyConnected(dag.getMeta(operator7), vc);
+     assertEquals("operator self reference", 1, vc.invalidCycles.size());
+     assertEquals("operator self reference", 1, vc.invalidCycles.get(0).size());
+     assertEquals("operator self reference", dag.getMeta(operator7), vc.invalidCycles.get(0).iterator().next());
 
      // 3 operator cycle
-     cycles.clear();
-     dag.findStronglyConnected(dag.getMeta(operator4), cycles);
-     assertEquals("3 operator cycle", 1, cycles.size());
-     assertEquals("3 operator cycle", 3, cycles.get(0).size());
-     assertTrue("operator2", cycles.get(0).contains(dag.getMeta(operator2).getName()));
-     assertTrue("operator3", cycles.get(0).contains(dag.getMeta(operator3).getName()));
-     assertTrue("operator4", cycles.get(0).contains(dag.getMeta(operator4).getName()));
+     vc = new LogicalPlan.ValidationContext();
+     dag.findStronglyConnected(dag.getMeta(operator4), vc);
+     assertEquals("3 operator cycle", 1, vc.invalidCycles.size());
+     assertEquals("3 operator cycle", 3, vc.invalidCycles.get(0).size());
+     assertTrue("operator2", vc.invalidCycles.get(0).contains(dag.getMeta(operator2)));
+     assertTrue("operator3", vc.invalidCycles.get(0).contains(dag.getMeta(operator3)));
+     assertTrue("operator4", vc.invalidCycles.get(0).contains(dag.getMeta(operator4)));
 
      try {
        dag.validate();
@@ -115,13 +118,44 @@ public class LogicalPlanTest {
 
   }
 
-  public static class ValidationOperator extends BaseOperator {
+  @Test
+  public void testCycleDetectionWithDelay()
+  {
+    LogicalPlan dag = new LogicalPlan();
+
+    TestGeneratorInputOperator opA = dag.addOperator("A", TestGeneratorInputOperator.class);
+    GenericTestOperator opB = dag.addOperator("B", GenericTestOperator.class);
+    GenericTestOperator opC = dag.addOperator("C", GenericTestOperator.class);
+    GenericTestOperator opD = dag.addOperator("D", GenericTestOperator.class);
+    DefaultDelayOperator<Object> opDelay = dag.addOperator("opDelay", new DefaultDelayOperator<>());
+    DefaultDelayOperator<Object> opDelay2 = dag.addOperator("opDelay2", new DefaultDelayOperator<>());
+
+    dag.addStream("AtoB", opA.outport, opB.inport1);
+    dag.addStream("BtoC", opB.outport1, opC.inport1);
+    dag.addStream("CtoD", opC.outport1, opD.inport1);
+    dag.addStream("CtoDelay", opC.outport2, opDelay.input);
+    dag.addStream("DtoDelay", opD.outport1, opDelay2.input);
+    dag.addStream("DelayToB", opDelay.output, opB.inport2);
+    dag.addStream("Delay2ToC", opDelay2.output, opC.inport2);
+
+    LogicalPlan.ValidationContext vc = new LogicalPlan.ValidationContext();
+    dag.findStronglyConnected(dag.getMeta(opA), vc);
+
+    Assert.assertEquals("No invalid cycle", Collections.emptyList(), vc.invalidCycles);
+    Set<OperatorMeta> exp = Sets.newHashSet(dag.getMeta(opDelay2), dag.getMeta(opDelay), dag.getMeta(opC), dag.getMeta(opB), dag.getMeta(opD));
+    Assert.assertEquals("cycle", exp, vc.stronglyConnected.get(0));
+  }
+
+
+  public static class ValidationOperator extends BaseOperator
+  {
     public final transient DefaultOutputPort<Object> goodOutputPort = new DefaultOutputPort<Object>();
 
     public final transient DefaultOutputPort<Object> badOutputPort = new DefaultOutputPort<Object>();
   }
 
-  public static class CounterOperator extends BaseOperator {
+  public static class CounterOperator extends BaseOperator
+  {
     final public transient InputPort<Object> countInputPort = new DefaultInputPort<Object>() {
       @Override
       final public void process(Object payload) {
@@ -130,8 +164,8 @@ public class LogicalPlanTest {
   }
 
   @Test
-  public void testLogicalPlanSerialization() throws Exception {
-
+  public void testLogicalPlanSerialization() throws Exception
+  {
     LogicalPlan dag = new LogicalPlan();
     dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
 
@@ -188,7 +222,8 @@ public class LogicalPlanTest {
     Assert.assertEquals("", 2, dag.getAllOperators().size());
   }
 
-  public static class ValidationTestOperator extends BaseOperator implements InputOperator {
+  public static class ValidationTestOperator extends BaseOperator implements InputOperator
+  {
     @NotNull
     @Pattern(regexp=".*malhar.*", message="Value has to contain 'malhar'!")
     private String stringField1;
@@ -271,8 +306,8 @@ public class LogicalPlanTest {
   }
 
   @Test
-  public void testOperatorValidation() {
-
+  public void testOperatorValidation()
+  {
     ValidationTestOperator bean = new ValidationTestOperator();
     bean.stringField1 = "malhar1";
     bean.intField1 = 1;
@@ -348,7 +383,8 @@ public class LogicalPlanTest {
   }
 
   @OperatorAnnotation(partitionable = false)
-  public static class TestOperatorAnnotationOperator extends BaseOperator {
+  public static class TestOperatorAnnotationOperator extends BaseOperator
+  {
 
     @InputPortFieldAnnotation( optional = true)
     final public transient DefaultInputPort<Object> input1 = new DefaultInputPort<Object>() {
@@ -358,11 +394,13 @@ public class LogicalPlanTest {
     };
   }
 
-  class NoInputPortOperator extends BaseOperator {
+  class NoInputPortOperator extends BaseOperator
+  {
   }
 
   @Test
-  public void testValidationForNonInputRootOperator() {
+  public void testValidationForNonInputRootOperator()
+  {
     LogicalPlan dag = new LogicalPlan();
     NoInputPortOperator x = dag.addOperator("x", new NoInputPortOperator());
     try {
@@ -374,8 +412,8 @@ public class LogicalPlanTest {
   }
 
   @OperatorAnnotation(partitionable = false)
-  public static class TestOperatorAnnotationOperator2 extends BaseOperator implements Partitioner<TestOperatorAnnotationOperator2> {
-
+  public static class TestOperatorAnnotationOperator2 extends BaseOperator implements Partitioner<TestOperatorAnnotationOperator2>
+  {
     @Override
     public Collection<Partition<TestOperatorAnnotationOperator2>> definePartitions(Collection<Partition<TestOperatorAnnotationOperator2>> partitions, PartitioningContext context)
     {
@@ -389,7 +427,8 @@ public class LogicalPlanTest {
   }
 
   @Test
-  public void testOperatorAnnotation() {
+  public void testOperatorAnnotation()
+  {
     LogicalPlan dag = new LogicalPlan();
     TestGeneratorInputOperator input = dag.addOperator("input1", TestGeneratorInputOperator.class);
     TestOperatorAnnotationOperator operator = dag.addOperator("operator1", TestOperatorAnnotationOperator.class);
@@ -430,8 +469,8 @@ public class LogicalPlanTest {
   }
 
   @Test
-  public void testPortConnectionValidation() {
-
+  public void testPortConnectionValidation()
+  {
     LogicalPlan dag = new LogicalPlan();
 
     TestNonOptionalOutportInputOperator input = dag.addOperator("input1", TestNonOptionalOutportInputOperator.class);
@@ -459,7 +498,8 @@ public class LogicalPlanTest {
   }
 
   @Test
-  public void testAtMostOnceProcessingModeValidation() {
+  public void testAtMostOnceProcessingModeValidation()
+  {
     LogicalPlan dag = new LogicalPlan();
 
     TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
@@ -489,8 +529,9 @@ public class LogicalPlanTest {
 
   }
 
-    @Test
-  public void testExactlyOnceProcessingModeValidation() {
+  @Test
+  public void testExactlyOnceProcessingModeValidation()
+  {
     LogicalPlan dag = new LogicalPlan();
 
     TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
@@ -527,7 +568,8 @@ public class LogicalPlanTest {
   }
 
   @Test
-  public void testLocalityValidation() {
+  public void testLocalityValidation()
+  {
     LogicalPlan dag = new LogicalPlan();
 
     TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
@@ -549,7 +591,8 @@ public class LogicalPlanTest {
     dag.validate();
   }
 
-  private class TestAnnotationsOperator extends BaseOperator implements InputOperator {
+  private class TestAnnotationsOperator extends BaseOperator implements InputOperator
+  {
     //final public transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>();
 
     @OutputPortFieldAnnotation( optional=false)
@@ -562,7 +605,8 @@ public class LogicalPlanTest {
     }
   }
 
-  private class TestAnnotationsOperator2 extends BaseOperator implements InputOperator{
+  private class TestAnnotationsOperator2 extends BaseOperator implements InputOperator
+  {
     // multiple ports w/o annotation, one of them must be connected
     final public transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>();
 
@@ -573,7 +617,8 @@ public class LogicalPlanTest {
     }
   }
 
-  private class TestAnnotationsOperator3 extends BaseOperator implements InputOperator{
+  private class TestAnnotationsOperator3 extends BaseOperator implements InputOperator
+  {
     // multiple ports w/o annotation, one of them must be connected
     @OutputPortFieldAnnotation( optional=true)
     final public transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>();
@@ -587,7 +632,8 @@ public class LogicalPlanTest {
   }
 
   @Test
-  public void testOutputPortAnnotation() {
+  public void testOutputPortAnnotation()
+  {
     LogicalPlan dag = new LogicalPlan();
     TestAnnotationsOperator ta1 = dag.addOperator("testAnnotationsOperator", new TestAnnotationsOperator());
 
@@ -623,7 +669,8 @@ public class LogicalPlanTest {
    * Operator that can be used with default Java serialization instead of Kryo
    */
   @DefaultSerializer(JavaSerializer.class)
-  public static class JdkSerializableOperator extends BaseOperator implements Serializable {
+  public static class JdkSerializableOperator extends BaseOperator implements Serializable
+  {
     private static final long serialVersionUID = -4024202339520027097L;
 
     public abstract class SerializableInputPort<T> implements InputPort<T>, Sink<T>, java.io.Serializable {
@@ -673,7 +720,8 @@ public class LogicalPlanTest {
   }
 
   @Test
-  public void testJdkSerializableOperator() throws Exception {
+  public void testJdkSerializableOperator() throws Exception
+  {
     LogicalPlan dag = new LogicalPlan();
     dag.addOperator("o1", new JdkSerializableOperator());
 
@@ -785,7 +833,8 @@ public class LogicalPlanTest {
     }
   }
 
-  public static class TestPortCodecOperator extends BaseOperator {
+  public static class TestPortCodecOperator extends BaseOperator
+  {
     public transient final DefaultInputPort<Object> inport1 = new DefaultInputPort<Object>()
     {
       @Override



[2/3] incubator-apex-core git commit: moved attribute from context to logical plan

Posted by th...@apache.org.
moved attribute from context to logical plan


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/4d5828c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/4d5828c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/4d5828c6

Branch: refs/heads/devel-3
Commit: 4d5828c6ca48f5d28cd8c77c5706c6f72c7cd1ad
Parents: f7e1ccf
Author: Gaurav <ga...@datatorrent.com>
Authored: Wed Dec 16 06:33:54 2015 -0800
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Jan 22 19:04:27 2016 -0800

----------------------------------------------------------------------
 api/src/main/java/com/datatorrent/api/Context.java           | 7 -------
 .../main/java/com/datatorrent/stram/engine/GenericNode.java  | 3 ++-
 .../java/com/datatorrent/stram/plan/logical/LogicalPlan.java | 8 +++++++-
 .../com/datatorrent/stram/plan/physical/PhysicalPlan.java    | 4 ++--
 .../com/datatorrent/stram/plan/physical/StreamMapping.java   | 2 +-
 5 files changed, 12 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4d5828c6/api/src/main/java/com/datatorrent/api/Context.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java
index 58bc552..ceed8a2 100644
--- a/api/src/main/java/com/datatorrent/api/Context.java
+++ b/api/src/main/java/com/datatorrent/api/Context.java
@@ -166,13 +166,6 @@ public interface Context
      */
     Attribute<Class<?>> TUPLE_CLASS = new Attribute<>(new Class2String<>());
 
-    /**
-     * Attribute of input port.
-     * This is a read-only attribute to query whether the input port is connected to a DelayOperator
-     * This is for iterative processing.
-     */
-    Attribute<Boolean> IS_CONNECTED_TO_DELAY_OPERATOR = new Attribute<>(false);
-
     @SuppressWarnings("FieldNameHidesFieldInSuperclass")
     long serialVersionUID = AttributeMap.AttributeInitializer.initialize(PortContext.class);
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4d5828c6/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
index 4777f93..1ccec31 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
@@ -40,6 +40,7 @@ import com.datatorrent.netlet.util.DTThrowable;
 import com.datatorrent.netlet.util.CircularBuffer;
 import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerStats;
 import com.datatorrent.stram.debug.TappedReservoir;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
 import com.datatorrent.stram.plan.logical.Operators;
 import com.datatorrent.stram.tuple.ResetWindowTuple;
 import com.datatorrent.stram.tuple.Tuple;
@@ -207,7 +208,7 @@ public class GenericNode extends Node<Operator>
     if (pcPair == null || pcPair.context == null) {
       return false;
     }
-    return pcPair.context.getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR);
+    return pcPair.context.getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4d5828c6/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 3c26118..883ad71 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -76,6 +76,12 @@ import com.datatorrent.stram.engine.Slider;
  */
 public class LogicalPlan implements Serializable, DAG
 {
+  /**
+   * Attribute of input port.
+   * This is a read-only attribute to query whether the input port is connected to a DelayOperator
+   * This is for iterative processing.
+   */
+  public static final Attribute<Boolean> IS_CONNECTED_TO_DELAY_OPERATOR = new Attribute<>(false);
   @SuppressWarnings("FieldNameHidesFieldInSuperclass")
   private static final long serialVersionUID = -2099729915606048704L;
   private static final Logger LOG = LoggerFactory.getLogger(LogicalPlan.class);
@@ -1914,7 +1920,7 @@ public class LogicalPlan implements Serializable, DAG
       for (InputPortMeta sink: downStream.sinks) {
         if (om.getOperator() instanceof Operator.DelayOperator) {
           // this is an iteration loop, do not treat it as downstream when detecting cycles
-          sink.attributes.put(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR, true);
+          sink.attributes.put(IS_CONNECTED_TO_DELAY_OPERATOR, true);
           continue;
         }
         OperatorMeta successor = sink.getOperatorWrapper();

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4d5828c6/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index da96ef3..c696224 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -948,11 +948,11 @@ public class PhysicalPlan implements Serializable
                 PTOperator slidingUnifier = StreamMapping.createSlidingUnifier(sourceOut.logicalStream, this,
                   sourceOM.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT), slidingWindowCount);
                 StreamMapping.addInput(slidingUnifier, sourceOut, null);
-                input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, slidingUnifier.outputs.get(0), ipm.getKey().getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR));
+                input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, slidingUnifier.outputs.get(0), ipm.getKey().getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR));
                 sourceMapping.outputStreams.get(ipm.getValue().getSource()).slidingUnifiers.add(slidingUnifier);
               }
               else {
-                input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, sourceOut, ipm.getKey().getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR));
+                input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, sourceOut, ipm.getKey().getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR));
               }
               oper.inputs.add(input);
             }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4d5828c6/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
index 91c6eef..f30ceb6 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
@@ -347,7 +347,7 @@ public class StreamMapping implements java.io.Serializable
     // link to upstream output(s) for this stream
     for (PTOutput upstreamOut : sourceOper.outputs) {
       if (upstreamOut.logicalStream == streamMeta) {
-        PTInput input = new PTInput(ipm.getPortName(), streamMeta, oper, pks, upstreamOut, ipm.getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR));
+        PTInput input = new PTInput(ipm.getPortName(), streamMeta, oper, pks, upstreamOut, ipm.getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR));
         oper.inputs.add(input);
       }
     }