You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/23 04:53:04 UTC

[3/3] incubator-apex-core git commit: APEXCORE-306 Update checkpoints for strongly connected operators as group.

APEXCORE-306 Update checkpoints for strongly connected operators as group.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/b3402be5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/b3402be5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/b3402be5

Branch: refs/heads/devel-3
Commit: b3402be5a45728515f4a8328fec5a76ddede0350
Parents: 4d5828c
Author: Thomas Weise <th...@datatorrent.com>
Authored: Thu Jan 21 16:39:55 2016 -0800
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Jan 22 19:24:31 2016 -0800

----------------------------------------------------------------------
 .../stram/StreamingContainerManager.java        | 161 +++++++++++++------
 .../com/datatorrent/stram/api/Checkpoint.java   |  11 ++
 .../stram/plan/logical/LogicalPlan.java         |  91 +++++++----
 .../com/datatorrent/stram/CheckpointTest.java   |   3 +-
 .../stram/plan/logical/DelayOperatorTest.java   |  88 +++++++++-
 .../stram/plan/logical/LogicalPlanTest.java     | 131 ++++++++++-----
 6 files changed, 358 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b3402be5/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 6233697..a687a37 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -35,6 +35,7 @@ import com.esotericsoftware.kryo.KryoException;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
 import com.google.common.base.Predicate;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
@@ -159,6 +160,7 @@ public class StreamingContainerManager implements PlanContext
   private long lastResourceRequest = 0;
   private final Map<String, StreamingContainerAgent> containers = new ConcurrentHashMap<String, StreamingContainerAgent>();
   private final List<Pair<PTOperator, Long>> purgeCheckpoints = new ArrayList<Pair<PTOperator, Long>>();
+  private Map<OperatorMeta, Set<OperatorMeta>> checkpointGroups;
   private final Map<Long, Set<PTOperator>> shutdownOperators = new HashMap<>();
   private CriticalPathInfo criticalPathInfo;
   private final ConcurrentMap<PTOperator, PTOperator> reportStats = Maps.newConcurrentMap();
@@ -812,6 +814,7 @@ public class StreamingContainerManager implements PlanContext
     Collection<OperatorMeta> logicalOperators = getLogicalPlan().getAllOperators();
     //for backward compatibility
     for (OperatorMeta operatorMeta : logicalOperators) {
+      @SuppressWarnings("deprecation")
       Context.CountersAggregator aggregator = operatorMeta.getValue(OperatorContext.COUNTERS_AGGREGATOR);
       if (aggregator == null) {
         continue;
@@ -825,6 +828,7 @@ public class StreamingContainerManager implements PlanContext
         }
       }
       if (counters.size() > 0) {
+        @SuppressWarnings("deprecation")
         Object aggregate = aggregator.aggregate(counters);
         latestLogicalCounters.put(operatorMeta.getName(), aggregate);
       }
@@ -857,6 +861,8 @@ public class StreamingContainerManager implements PlanContext
         if (windowMetrics == null) {
           windowMetrics = new LinkedBlockingQueue<Pair<Long, Map<String, Object>>>(METRIC_QUEUE_SIZE)
           {
+            private static final long serialVersionUID = 1L;
+
             @Override
             public boolean add(Pair<Long, Map<String, Object>> longMapPair)
             {
@@ -1134,7 +1140,7 @@ public class StreamingContainerManager implements PlanContext
     cs.container.setAllocatedVCores(0);
 
     // resolve dependencies
-    UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock);
+    UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock, false, getCheckpointGroups());
     for (PTOperator oper : cs.container.getOperators()) {
       updateRecoveryCheckpoints(oper, ctx);
     }
@@ -1881,31 +1887,18 @@ public class StreamingContainerManager implements PlanContext
     public final Set<PTOperator> blocked = new LinkedHashSet<PTOperator>();
     public final long currentTms;
     public final boolean recovery;
+    public final Map<OperatorMeta, Set<OperatorMeta>> checkpointGroups;
 
     public UpdateCheckpointsContext(Clock clock)
     {
-      this.currentTms = clock.getTime();
-      this.recovery = false;
+      this(clock, false, Collections.<OperatorMeta, Set<OperatorMeta>>emptyMap());
     }
 
-    public UpdateCheckpointsContext(Clock clock, boolean recovery)
+    public UpdateCheckpointsContext(Clock clock, boolean recovery, Map<OperatorMeta, Set<OperatorMeta>> checkpointGroups)
     {
       this.currentTms = clock.getTime();
       this.recovery = recovery;
-    }
-
-  }
-
-  private void addVisited(PTOperator operator, UpdateCheckpointsContext ctx)
-  {
-    ctx.visited.add(operator);
-    for (PTOperator.PTOutput out : operator.getOutputs()) {
-      for (PTOperator.PTInput sink : out.sinks) {
-        PTOperator sinkOperator = sink.target;
-        if (!ctx.visited.contains(sinkOperator)) {
-          addVisited(sinkOperator, ctx);
-        }
-      }
+      this.checkpointGroups = checkpointGroups;
     }
   }
 
@@ -1933,20 +1926,55 @@ public class StreamingContainerManager implements PlanContext
       }
     }
 
-    long maxCheckpoint = operator.getRecentCheckpoint().windowId;
-    if (ctx.recovery && maxCheckpoint == Stateless.WINDOW_ID && operator.isOperatorStateLess()) {
-      long currentWindowId = WindowGenerator.getWindowId(ctx.currentTms, this.vars.windowStartMillis, this.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS));
-      maxCheckpoint = currentWindowId;
+    // the most recent checkpoint eligible for recovery based on downstream state
+    Checkpoint maxCheckpoint = Checkpoint.INITIAL_CHECKPOINT;
+
+    Set<OperatorMeta> checkpointGroup = ctx.checkpointGroups.get(operator.getOperatorMeta());
+    if (checkpointGroup == null) {
+      checkpointGroup = Collections.singleton(operator.getOperatorMeta());
+    }
+    // find intersection of checkpoints that group can collectively move to
+    TreeSet<Checkpoint> commonCheckpoints = new TreeSet<>(new Checkpoint.CheckpointComparator());
+    synchronized (operator.checkpoints) {
+      commonCheckpoints.addAll(operator.checkpoints);
+    }
+    Set<PTOperator> groupOpers = new HashSet<>(checkpointGroup.size());
+    if (checkpointGroup.size() > 1) {
+      for (OperatorMeta om : checkpointGroup) {
+        Collection<PTOperator> operators = plan.getAllOperators(om);
+        for (PTOperator groupOper : operators) {
+          synchronized (groupOper.checkpoints) {
+            commonCheckpoints.retainAll(groupOper.checkpoints);
+          }
+          // visit all downstream operators of the group
+          ctx.visited.add(groupOper);
+          groupOpers.add(groupOper);
+        }
+      }
+      // highest common checkpoint
+      if (!commonCheckpoints.isEmpty()) {
+        maxCheckpoint = commonCheckpoints.last();
+      }
+    } else {
+      // without logical grouping, treat partitions as independent
+      // this is especially important for parallel partitioning
+      ctx.visited.add(operator);
+      groupOpers.add(operator);
+      maxCheckpoint = operator.getRecentCheckpoint();
+      if (ctx.recovery && maxCheckpoint.windowId == Stateless.WINDOW_ID && operator.isOperatorStateLess()) {
+        long currentWindowId = WindowGenerator.getWindowId(ctx.currentTms, this.vars.windowStartMillis, this.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS));
+        maxCheckpoint = new Checkpoint(currentWindowId, 0, 0);
+      }
     }
-    ctx.visited.add(operator);
 
     // DFS downstream operators
-    if (operator.getOperatorMeta().getOperator() instanceof Operator.DelayOperator) {
-      addVisited(operator, ctx);
-    } else {
-      for (PTOperator.PTOutput out : operator.getOutputs()) {
+    for (PTOperator groupOper : groupOpers) {
+      for (PTOperator.PTOutput out : groupOper.getOutputs()) {
         for (PTOperator.PTInput sink : out.sinks) {
           PTOperator sinkOperator = sink.target;
+          if (groupOpers.contains(sinkOperator)) {
+            continue; // downstream operator within group
+          }
           if (!ctx.visited.contains(sinkOperator)) {
             // downstream traversal
             updateRecoveryCheckpoints(sinkOperator, ctx);
@@ -1954,7 +1982,7 @@ public class StreamingContainerManager implements PlanContext
           // recovery window id cannot move backwards
           // when dynamically adding new operators
           if (sinkOperator.getRecoveryCheckpoint().windowId >= operator.getRecoveryCheckpoint().windowId) {
-            maxCheckpoint = Math.min(maxCheckpoint, sinkOperator.getRecoveryCheckpoint().windowId);
+            maxCheckpoint = Checkpoint.min(maxCheckpoint, sinkOperator.getRecoveryCheckpoint());
           }
 
           if (ctx.blocked.contains(sinkOperator)) {
@@ -1967,33 +1995,43 @@ public class StreamingContainerManager implements PlanContext
       }
     }
 
-    // checkpoint frozen during deployment
-    if (ctx.recovery || operator.getState() != PTOperator.State.PENDING_DEPLOY) {
-      // remove previous checkpoints
-      Checkpoint c1 = Checkpoint.INITIAL_CHECKPOINT;
-      synchronized (operator.checkpoints) {
-        if (!operator.checkpoints.isEmpty() && (operator.checkpoints.getFirst()).windowId <= maxCheckpoint) {
-          c1 = operator.checkpoints.getFirst();
-          Checkpoint c2;
-          while (operator.checkpoints.size() > 1 && ((c2 = operator.checkpoints.get(1)).windowId) <= maxCheckpoint) {
-            operator.checkpoints.removeFirst();
-            //LOG.debug("Checkpoint to delete: operator={} windowId={}", operator.getName(), c1);
-            this.purgeCheckpoints.add(new Pair<PTOperator, Long>(operator, c1.windowId));
-            c1 = c2;
+    // find the common checkpoint that is <= downstream recovery checkpoint
+    if (!commonCheckpoints.contains(maxCheckpoint)) {
+      if (!commonCheckpoints.isEmpty()) {
+        maxCheckpoint = Objects.firstNonNull(commonCheckpoints.floor(maxCheckpoint), maxCheckpoint);
+      }
+    }
+
+    for (PTOperator groupOper : groupOpers) {
+      // checkpoint frozen during deployment
+      if (ctx.recovery || groupOper.getState() != PTOperator.State.PENDING_DEPLOY) {
+        // remove previous checkpoints
+        Checkpoint c1 = Checkpoint.INITIAL_CHECKPOINT;
+        LinkedList<Checkpoint> checkpoints = groupOper.checkpoints;
+        synchronized (checkpoints) {
+          if (!checkpoints.isEmpty() && (checkpoints.getFirst()).windowId <= maxCheckpoint.windowId) {
+            c1 = checkpoints.getFirst();
+            Checkpoint c2;
+            while (checkpoints.size() > 1 && ((c2 = checkpoints.get(1)).windowId) <= maxCheckpoint.windowId) {
+              checkpoints.removeFirst();
+              //LOG.debug("Checkpoint to delete: operator={} windowId={}", operator.getName(), c1);
+              this.purgeCheckpoints.add(new Pair<PTOperator, Long>(groupOper, c1.windowId));
+              c1 = c2;
+            }
           }
-        }
-        else {
-          if (ctx.recovery && operator.checkpoints.isEmpty() && operator.isOperatorStateLess()) {
-            LOG.debug("Adding checkpoint for stateless operator {} {}", operator, Codec.getStringWindowId(maxCheckpoint));
-            c1 = operator.addCheckpoint(maxCheckpoint, this.vars.windowStartMillis);
+          else {
+            if (ctx.recovery && checkpoints.isEmpty() && groupOper.isOperatorStateLess()) {
+              LOG.debug("Adding checkpoint for stateless operator {} {}", groupOper, Codec.getStringWindowId(maxCheckpoint.windowId));
+              c1 = groupOper.addCheckpoint(maxCheckpoint.windowId, this.vars.windowStartMillis);
+            }
           }
         }
+        //LOG.debug("Operator {} checkpoints: commit {} recent {}", new Object[] {operator.getName(), c1, operator.checkpoints});
+        groupOper.setRecoveryCheckpoint(c1);
+      }
+      else {
+        LOG.debug("Skipping checkpoint update {} during {}", groupOper, groupOper.getState());
       }
-      //LOG.debug("Operator {} checkpoints: commit {} recent {}", new Object[] {operator.getName(), c1, operator.checkpoints});
-      operator.setRecoveryCheckpoint(c1);
-    }
-    else {
-      LOG.debug("Skipping checkpoint update {} during {}", operator, operator.getState());
     }
 
   }
@@ -2009,13 +2047,32 @@ public class StreamingContainerManager implements PlanContext
     return this.vars.windowStartMillis;
   }
 
+  private Map<OperatorMeta, Set<OperatorMeta>> getCheckpointGroups()
+  {
+    if (this.checkpointGroups == null) {
+      this.checkpointGroups = new HashMap<>();
+      LogicalPlan dag = this.plan.getLogicalPlan();
+      dag.resetNIndex();
+      LogicalPlan.ValidationContext vc = new LogicalPlan.ValidationContext();
+      for (OperatorMeta om : dag.getRootOperators()) {
+        this.plan.getLogicalPlan().findStronglyConnected(om, vc);
+      }
+      for (Set<OperatorMeta> checkpointGroup : vc.stronglyConnected) {
+        for (OperatorMeta om : checkpointGroup) {
+          this.checkpointGroups.put(om, checkpointGroup);
+        }
+      }
+    }
+    return checkpointGroups;
+  }
+
   /**
    * Visit all operators to update current checkpoint based on updated downstream state.
    * Purge older checkpoints that are no longer needed.
    */
   private long updateCheckpoints(boolean recovery)
   {
-    UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock, recovery);
+    UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock, recovery, getCheckpointGroups());
     for (OperatorMeta logicalOperator : plan.getLogicalPlan().getRootOperators()) {
       //LOG.debug("Updating checkpoints for operator {}", logicalOperator.getName());
       List<PTOperator> operators = plan.getOperators(logicalOperator);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b3402be5/engine/src/main/java/com/datatorrent/stram/api/Checkpoint.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/api/Checkpoint.java b/engine/src/main/java/com/datatorrent/stram/api/Checkpoint.java
index 5ec4a7e..d24b17a 100644
--- a/engine/src/main/java/com/datatorrent/stram/api/Checkpoint.java
+++ b/engine/src/main/java/com/datatorrent/stram/api/Checkpoint.java
@@ -18,6 +18,8 @@
  */
 package com.datatorrent.stram.api;
 
+import java.util.Comparator;
+
 import com.datatorrent.api.annotation.Stateless;
 import com.datatorrent.bufferserver.util.Codec;
 
@@ -102,6 +104,15 @@ public class Checkpoint implements com.datatorrent.api.Stats.Checkpoint
     return windowId;
   }
 
+  public static class CheckpointComparator implements Comparator<Checkpoint>
+  {
+    @Override
+    public int compare(Checkpoint o1, Checkpoint o2)
+    {
+      return Long.compare(o1.windowId, o2.windowId);
+    }
+  }
+
   @SuppressWarnings("FieldNameHidesFieldInSuperclass")
   public static final Checkpoint INITIAL_CHECKPOINT = new Checkpoint(Stateless.WINDOW_ID, 0, 0);
   private static final long serialVersionUID = 201402152116L;

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b3402be5/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 883ad71..6d7ebe1 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -159,8 +159,6 @@ public class LogicalPlan implements Serializable, DAG
   public final Map<String, ModuleMeta> modules = new LinkedHashMap<>();
   private final List<OperatorMeta> rootOperators = new ArrayList<OperatorMeta>();
   private final Attribute.AttributeMap attributes = new DefaultAttributeMap();
-  private transient int nodeIndex = 0; // used for cycle validation
-  private transient Stack<OperatorMeta> stack = new Stack<OperatorMeta>(); // used for cycle validation
   private transient Map<String, ArrayListMultimap<OutputPort<?>, InputPort<?>>> streamLinks = new HashMap<>();
 
   @Override
@@ -1540,6 +1538,7 @@ public class LogicalPlan implements Serializable, DAG
     return this.operators.get(operatorName);
   }
 
+  @Override
   public ModuleMeta getModuleMeta(String moduleName)
   {
     return this.modules.get(moduleName);
@@ -1557,6 +1556,7 @@ public class LogicalPlan implements Serializable, DAG
     throw new IllegalArgumentException("Operator not associated with the DAG: " + operator);
   }
 
+  @Override
   public ModuleMeta getMeta(Module module)
   {
     for (ModuleMeta m : getAllModules()) {
@@ -1626,6 +1626,24 @@ public class LogicalPlan implements Serializable, DAG
     return classNames;
   }
 
+  public static class ValidationContext
+  {
+    public int nodeIndex = 0;
+    public Stack<OperatorMeta> stack = new Stack<OperatorMeta>();
+    public Stack<OperatorMeta> path = new Stack<OperatorMeta>();
+    public List<Set<OperatorMeta>> stronglyConnected = new ArrayList<>();
+    public OperatorMeta invalidLoopAt;
+    public List<Set<OperatorMeta>> invalidCycles = new ArrayList<>();
+  }
+
+  public void resetNIndex()
+  {
+    for (OperatorMeta om : getAllOperators()) {
+      om.lowlink = null;
+      om.nindex = null;
+    }
+  }
+
   /**
    * Validate the plan. Includes checks that required ports are connected,
    * required configuration parameters specified, graph free of cycles etc.
@@ -1752,21 +1770,20 @@ public class LogicalPlan implements Serializable, DAG
         throw new ValidationException("At least one output port must be connected: " + n.name);
       }
     }
-    stack = new Stack<OperatorMeta>();
 
-    List<List<String>> cycles = new ArrayList<List<String>>();
+    ValidationContext validatonContext = new ValidationContext();
     for (OperatorMeta n: operators.values()) {
       if (n.nindex == null) {
-        findStronglyConnected(n, cycles);
+        findStronglyConnected(n, validatonContext);
       }
     }
-    if (!cycles.isEmpty()) {
-      throw new ValidationException("Loops in graph: " + cycles);
+    if (!validatonContext.invalidCycles.isEmpty()) {
+      throw new ValidationException("Loops in graph: " + validatonContext.invalidCycles);
     }
 
     List<List<String>> invalidDelays = new ArrayList<>();
     for (OperatorMeta n : rootOperators) {
-      findInvalidDelays(n, invalidDelays);
+      findInvalidDelays(n, invalidDelays, new Stack<OperatorMeta>());
     }
     if (!invalidDelays.isEmpty()) {
       throw new ValidationException("Invalid delays in graph: " + invalidDelays);
@@ -1908,59 +1925,72 @@ public class LogicalPlan implements Serializable, DAG
    * @param om
    * @param cycles
    */
-  public void findStronglyConnected(OperatorMeta om, List<List<String>> cycles)
+  public void findStronglyConnected(OperatorMeta om, ValidationContext ctx)
   {
-    om.nindex = nodeIndex;
-    om.lowlink = nodeIndex;
-    nodeIndex++;
-    stack.push(om);
+    om.nindex = ctx.nodeIndex;
+    om.lowlink = ctx.nodeIndex;
+    ctx.nodeIndex++;
+    ctx.stack.push(om);
+    ctx.path.push(om);
 
     // depth first successors traversal
     for (StreamMeta downStream: om.outputStreams.values()) {
       for (InputPortMeta sink: downStream.sinks) {
-        if (om.getOperator() instanceof Operator.DelayOperator) {
-          // this is an iteration loop, do not treat it as downstream when detecting cycles
-          sink.attributes.put(IS_CONNECTED_TO_DELAY_OPERATOR, true);
-          continue;
-        }
         OperatorMeta successor = sink.getOperatorWrapper();
         if (successor == null) {
           continue;
         }
         // check for self referencing node
         if (om == successor) {
-          cycles.add(Collections.singletonList(om.name));
+          ctx.invalidCycles.add(Collections.singleton(om));
         }
         if (successor.nindex == null) {
           // not visited yet
-          findStronglyConnected(successor, cycles);
+          findStronglyConnected(successor, ctx);
           om.lowlink = Math.min(om.lowlink, successor.lowlink);
         }
-        else if (stack.contains(successor)) {
+        else if (ctx.stack.contains(successor)) {
           om.lowlink = Math.min(om.lowlink, successor.nindex);
+          boolean isDelayLoop = false;
+          for (int i=ctx.path.size(); i>0; i--) {
+            OperatorMeta om2 = ctx.path.get(i-1);
+            if (om2.getOperator() instanceof Operator.DelayOperator) {
+              isDelayLoop = true;
+            }
+            if (om2 == successor) {
+              break;
+            }
+          }
+          if (!isDelayLoop) {
+            ctx.invalidLoopAt = successor;
+          }
         }
       }
     }
 
     // pop stack for all root operators
     if (om.lowlink.equals(om.nindex)) {
-      List<String> connectedIds = new ArrayList<String>();
-      while (!stack.isEmpty()) {
-        OperatorMeta n2 = stack.pop();
-        connectedIds.add(n2.name);
+      Set<OperatorMeta> connectedSet = new LinkedHashSet<>(ctx.stack.size());
+      while (!ctx.stack.isEmpty()) {
+        OperatorMeta n2 = ctx.stack.pop();
+        connectedSet.add(n2);
         if (n2 == om) {
           break; // collected all connected operators
         }
       }
       // strongly connected (cycle) if more than one node in stack
-      if (connectedIds.size() > 1) {
-        LOG.debug("detected cycle from node {}: {}", om.name, connectedIds);
-        cycles.add(connectedIds);
+      if (connectedSet.size() > 1) {
+        ctx.stronglyConnected.add(connectedSet);
+        if (connectedSet.contains(ctx.invalidLoopAt)) {
+          ctx.invalidCycles.add(connectedSet);
+        }
       }
     }
+    ctx.path.pop();
+
   }
 
-  public void findInvalidDelays(OperatorMeta om, List<List<String>> invalidDelays)
+  public void findInvalidDelays(OperatorMeta om, List<List<String>> invalidDelays, Stack<OperatorMeta> stack)
   {
     stack.push(om);
 
@@ -1977,6 +2007,7 @@ public class LogicalPlan implements Serializable, DAG
       for (InputPortMeta sink : downStream.sinks) {
         OperatorMeta successor = sink.getOperatorWrapper();
         if (isDelayOperator) {
+          sink.attributes.put(IS_CONNECTED_TO_DELAY_OPERATOR, true);
           // Check whether all downstream operators are already visited in the path
           if (successor != null && !stack.contains(successor)) {
             LOG.debug("detected DelayOperator does not immediately output to a visited operator {}.{}->{}.{}",
@@ -1984,7 +2015,7 @@ public class LogicalPlan implements Serializable, DAG
             invalidDelays.add(Arrays.asList(om.getName(), successor.getName()));
           }
         } else {
-          findInvalidDelays(successor, invalidDelays);
+          findInvalidDelays(successor, invalidDelays, stack);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b3402be5/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
index ee3cbc3..5675b53 100644
--- a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
@@ -56,6 +56,7 @@ import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHea
 import com.datatorrent.stram.engine.GenericTestOperator;
 import com.datatorrent.stram.engine.OperatorContext;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
 import com.datatorrent.stram.plan.physical.PTContainer;
 import com.datatorrent.stram.plan.physical.PTOperator;
 import com.datatorrent.stram.plan.physical.PhysicalPlan;
@@ -314,7 +315,7 @@ public class CheckpointTest
     o4p1.checkpoints.add(leafCheckpoint);
 
     UpdateCheckpointsContext ctx;
-    dnm.updateRecoveryCheckpoints(o1p1, ctx = new UpdateCheckpointsContext(clock, true));
+    dnm.updateRecoveryCheckpoints(o1p1, ctx = new UpdateCheckpointsContext(clock, true, Collections.<OperatorMeta, Set<OperatorMeta>>emptyMap()));
     Assert.assertEquals("initial checkpoint " + o1p1, Checkpoint.INITIAL_CHECKPOINT, o1p1.getRecoveryCheckpoint());
     Assert.assertEquals("initial checkpoint " + o2SLp1, leafCheckpoint, o2SLp1.getRecoveryCheckpoint());
     Assert.assertEquals("initial checkpoint " + o3SLp1, new Checkpoint(clock.getTime(), 0, 0), o3SLp1.getRecoveryCheckpoint());

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b3402be5/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
index 359da17..06f184f 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
@@ -20,7 +20,11 @@ package com.datatorrent.stram.plan.logical;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
 import java.util.TreeSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.locks.Lock;
@@ -32,8 +36,14 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
 
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
+
+import com.google.common.collect.Sets;
+
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DefaultInputPort;
@@ -42,8 +52,17 @@ import com.datatorrent.api.Operator;
 import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.common.util.DefaultDelayOperator;
 import com.datatorrent.stram.StramLocalCluster;
+import com.datatorrent.stram.StreamingContainerManager;
+import com.datatorrent.stram.StreamingContainerManager.UpdateCheckpointsContext;
+import com.datatorrent.stram.api.Checkpoint;
 import com.datatorrent.stram.engine.GenericTestOperator;
 import com.datatorrent.stram.engine.TestGeneratorInputOperator;
+import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
+import com.datatorrent.stram.plan.physical.PTOperator;
+import com.datatorrent.stram.plan.physical.PhysicalPlan;
+import com.datatorrent.stram.support.StramTestSupport;
+import com.datatorrent.stram.support.StramTestSupport.MemoryStorageAgent;
+import com.datatorrent.stram.support.StramTestSupport.TestMeta;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -75,7 +94,7 @@ public class DelayOperatorTest
     GenericTestOperator opB = dag.addOperator("B", GenericTestOperator.class);
     GenericTestOperator opC = dag.addOperator("C", GenericTestOperator.class);
     GenericTestOperator opD = dag.addOperator("D", GenericTestOperator.class);
-    DefaultDelayOperator opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
+    DefaultDelayOperator<Object> opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
 
     dag.addStream("BtoC", opB.outport1, opC.inport1);
     dag.addStream("CtoD", opC.outport1, opD.inport1);
@@ -83,7 +102,7 @@ public class DelayOperatorTest
     dag.addStream("DelayToD", opDelay.output, opD.inport2);
 
     List<List<String>> invalidDelays = new ArrayList<>();
-    dag.findInvalidDelays(dag.getMeta(opB), invalidDelays);
+    dag.findInvalidDelays(dag.getMeta(opB), invalidDelays, new Stack<OperatorMeta>());
     assertEquals("operator invalid delay", 1, invalidDelays.size());
 
     try {
@@ -106,7 +125,7 @@ public class DelayOperatorTest
     dag.addStream("DelayToC", opDelay.output, opC.inport2);
 
     invalidDelays = new ArrayList<>();
-    dag.findInvalidDelays(dag.getMeta(opB), invalidDelays);
+    dag.findInvalidDelays(dag.getMeta(opB), invalidDelays, new Stack<OperatorMeta>());
     assertEquals("operator invalid delay", 1, invalidDelays.size());
 
     try {
@@ -373,5 +392,68 @@ public class DelayOperatorTest
         Arrays.copyOfRange(new TreeSet<>(FibonacciOperator.results).toArray(), 0, 20));
   }
 
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testCheckpointUpdate()
+  {
+    LogicalPlan dag = StramTestSupport.createDAG(testMeta);
+
+    TestGeneratorInputOperator opA = dag.addOperator("A", TestGeneratorInputOperator.class);
+    GenericTestOperator opB = dag.addOperator("B", GenericTestOperator.class);
+    GenericTestOperator opC = dag.addOperator("C", GenericTestOperator.class);
+    GenericTestOperator opD = dag.addOperator("D", GenericTestOperator.class);
+    DefaultDelayOperator<Object> opDelay = dag.addOperator("opDelay", new DefaultDelayOperator<>());
+
+    dag.addStream("AtoB", opA.outport, opB.inport1);
+    dag.addStream("BtoC", opB.outport1, opC.inport1);
+    dag.addStream("CtoD", opC.outport1, opD.inport1);
+    dag.addStream("CtoDelay", opC.outport2, opDelay.input);
+    dag.addStream("DelayToB", opDelay.output, opB.inport2);
+    dag.validate();
+
+    dag.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
+    StreamingContainerManager scm = new StreamingContainerManager(dag);
+    PhysicalPlan plan = scm.getPhysicalPlan();
+    // set all operators as active to enable recovery window id update
+    for (PTOperator oper : plan.getAllOperators().values()) {
+      oper.setState(PTOperator.State.ACTIVE);
+    }
+
+    Clock clock = new SystemClock();
+
+    PTOperator opA1 = plan.getOperators(dag.getMeta(opA)).get(0);
+    PTOperator opB1 = plan.getOperators(dag.getMeta(opB)).get(0);
+    PTOperator opC1 = plan.getOperators(dag.getMeta(opC)).get(0);
+    PTOperator opDelay1 = plan.getOperators(dag.getMeta(opDelay)).get(0);
+    PTOperator opD1 = plan.getOperators(dag.getMeta(opD)).get(0);
+
+    Checkpoint cp3 = new Checkpoint(3L, 0, 0);
+    Checkpoint cp5 = new Checkpoint(5L, 0, 0);
+    Checkpoint cp4 = new Checkpoint(4L, 0, 0);
+
+    opB1.checkpoints.add(cp3);
+    opC1.checkpoints.add(cp3);
+    opC1.checkpoints.add(cp4);
+    opDelay1.checkpoints.add(cp3);
+    opDelay1.checkpoints.add(cp5);
+    opD1.checkpoints.add(cp5);
+    // construct grouping that would be supplied through LogicalPlan
+    Set<OperatorMeta> stronglyConnected = Sets.newHashSet(dag.getMeta(opB), dag.getMeta(opC), dag.getMeta(opDelay));
+    Map<OperatorMeta, Set<OperatorMeta>> groups = new HashMap<>();
+    for (OperatorMeta om : stronglyConnected) {
+      groups.put(om, stronglyConnected);
+    }
+
+    UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock, false, groups);
+    scm.updateRecoveryCheckpoints(opB1, ctx);
+
+    Assert.assertEquals("checkpoint " + opA1, Checkpoint.INITIAL_CHECKPOINT, opA1.getRecoveryCheckpoint());
+    Assert.assertEquals("checkpoint " + opB1, cp3, opC1.getRecoveryCheckpoint());
+    Assert.assertEquals("checkpoint " + opC1, cp3, opC1.getRecoveryCheckpoint());
+    Assert.assertEquals("checkpoint " + opD1, cp5, opD1.getRecoveryCheckpoint());
+
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b3402be5/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
index a4ac488..9383f12 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
@@ -19,6 +19,7 @@
 package com.datatorrent.stram.plan.logical;
 
 import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.common.util.DefaultDelayOperator;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -35,6 +36,7 @@ import javax.validation.constraints.Pattern;
 import com.esotericsoftware.kryo.DefaultSerializer;
 import com.esotericsoftware.kryo.serializers.JavaSerializer;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -43,7 +45,6 @@ import static org.junit.Assert.*;
 
 import com.datatorrent.common.partitioner.StatelessPartitioner;
 import com.datatorrent.api.*;
-import com.datatorrent.api.Context.DAGContext;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.DAG.Locality;
@@ -61,10 +62,12 @@ import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta;
 import com.datatorrent.stram.support.StramTestSupport.MemoryStorageAgent;
 import com.datatorrent.stram.support.StramTestSupport.RegexMatcher;
 
-public class LogicalPlanTest {
+public class LogicalPlanTest
+{
 
   @Test
-  public void testCycleDetection() {
+  public void testCycleDetection()
+  {
      LogicalPlan dag = new LogicalPlan();
 
      //NodeConf operator1 = b.getOrAddNode("operator1");
@@ -91,20 +94,20 @@ public class LogicalPlanTest {
        // expected, stream can have single input/output only
      }
 
-     List<List<String>> cycles = new ArrayList<List<String>>();
-     dag.findStronglyConnected(dag.getMeta(operator7), cycles);
-     assertEquals("operator self reference", 1, cycles.size());
-     assertEquals("operator self reference", 1, cycles.get(0).size());
-     assertEquals("operator self reference", dag.getMeta(operator7).getName(), cycles.get(0).get(0));
+     LogicalPlan.ValidationContext vc = new LogicalPlan.ValidationContext();
+     dag.findStronglyConnected(dag.getMeta(operator7), vc);
+     assertEquals("operator self reference", 1, vc.invalidCycles.size());
+     assertEquals("operator self reference", 1, vc.invalidCycles.get(0).size());
+     assertEquals("operator self reference", dag.getMeta(operator7), vc.invalidCycles.get(0).iterator().next());
 
      // 3 operator cycle
-     cycles.clear();
-     dag.findStronglyConnected(dag.getMeta(operator4), cycles);
-     assertEquals("3 operator cycle", 1, cycles.size());
-     assertEquals("3 operator cycle", 3, cycles.get(0).size());
-     assertTrue("operator2", cycles.get(0).contains(dag.getMeta(operator2).getName()));
-     assertTrue("operator3", cycles.get(0).contains(dag.getMeta(operator3).getName()));
-     assertTrue("operator4", cycles.get(0).contains(dag.getMeta(operator4).getName()));
+     vc = new LogicalPlan.ValidationContext();
+     dag.findStronglyConnected(dag.getMeta(operator4), vc);
+     assertEquals("3 operator cycle", 1, vc.invalidCycles.size());
+     assertEquals("3 operator cycle", 3, vc.invalidCycles.get(0).size());
+     assertTrue("operator2", vc.invalidCycles.get(0).contains(dag.getMeta(operator2)));
+     assertTrue("operator3", vc.invalidCycles.get(0).contains(dag.getMeta(operator3)));
+     assertTrue("operator4", vc.invalidCycles.get(0).contains(dag.getMeta(operator4)));
 
      try {
        dag.validate();
@@ -115,13 +118,44 @@ public class LogicalPlanTest {
 
   }
 
-  public static class ValidationOperator extends BaseOperator {
+  @Test
+  public void testCycleDetectionWithDelay()
+  {
+    LogicalPlan dag = new LogicalPlan();
+
+    TestGeneratorInputOperator opA = dag.addOperator("A", TestGeneratorInputOperator.class);
+    GenericTestOperator opB = dag.addOperator("B", GenericTestOperator.class);
+    GenericTestOperator opC = dag.addOperator("C", GenericTestOperator.class);
+    GenericTestOperator opD = dag.addOperator("D", GenericTestOperator.class);
+    DefaultDelayOperator<Object> opDelay = dag.addOperator("opDelay", new DefaultDelayOperator<>());
+    DefaultDelayOperator<Object> opDelay2 = dag.addOperator("opDelay2", new DefaultDelayOperator<>());
+
+    dag.addStream("AtoB", opA.outport, opB.inport1);
+    dag.addStream("BtoC", opB.outport1, opC.inport1);
+    dag.addStream("CtoD", opC.outport1, opD.inport1);
+    dag.addStream("CtoDelay", opC.outport2, opDelay.input);
+    dag.addStream("DtoDelay", opD.outport1, opDelay2.input);
+    dag.addStream("DelayToB", opDelay.output, opB.inport2);
+    dag.addStream("Delay2ToC", opDelay2.output, opC.inport2);
+
+    LogicalPlan.ValidationContext vc = new LogicalPlan.ValidationContext();
+    dag.findStronglyConnected(dag.getMeta(opA), vc);
+
+    Assert.assertEquals("No invalid cycle", Collections.emptyList(), vc.invalidCycles);
+    Set<OperatorMeta> exp = Sets.newHashSet(dag.getMeta(opDelay2), dag.getMeta(opDelay), dag.getMeta(opC), dag.getMeta(opB), dag.getMeta(opD));
+    Assert.assertEquals("cycle", exp, vc.stronglyConnected.get(0));
+  }
+
+
+  public static class ValidationOperator extends BaseOperator
+  {
     public final transient DefaultOutputPort<Object> goodOutputPort = new DefaultOutputPort<Object>();
 
     public final transient DefaultOutputPort<Object> badOutputPort = new DefaultOutputPort<Object>();
   }
 
-  public static class CounterOperator extends BaseOperator {
+  public static class CounterOperator extends BaseOperator
+  {
     final public transient InputPort<Object> countInputPort = new DefaultInputPort<Object>() {
       @Override
       final public void process(Object payload) {
@@ -130,8 +164,8 @@ public class LogicalPlanTest {
   }
 
   @Test
-  public void testLogicalPlanSerialization() throws Exception {
-
+  public void testLogicalPlanSerialization() throws Exception
+  {
     LogicalPlan dag = new LogicalPlan();
     dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
 
@@ -188,7 +222,8 @@ public class LogicalPlanTest {
     Assert.assertEquals("", 2, dag.getAllOperators().size());
   }
 
-  public static class ValidationTestOperator extends BaseOperator implements InputOperator {
+  public static class ValidationTestOperator extends BaseOperator implements InputOperator
+  {
     @NotNull
     @Pattern(regexp=".*malhar.*", message="Value has to contain 'malhar'!")
     private String stringField1;
@@ -271,8 +306,8 @@ public class LogicalPlanTest {
   }
 
   @Test
-  public void testOperatorValidation() {
-
+  public void testOperatorValidation()
+  {
     ValidationTestOperator bean = new ValidationTestOperator();
     bean.stringField1 = "malhar1";
     bean.intField1 = 1;
@@ -348,7 +383,8 @@ public class LogicalPlanTest {
   }
 
   @OperatorAnnotation(partitionable = false)
-  public static class TestOperatorAnnotationOperator extends BaseOperator {
+  public static class TestOperatorAnnotationOperator extends BaseOperator
+  {
 
     @InputPortFieldAnnotation( optional = true)
     final public transient DefaultInputPort<Object> input1 = new DefaultInputPort<Object>() {
@@ -358,11 +394,13 @@ public class LogicalPlanTest {
     };
   }
 
-  class NoInputPortOperator extends BaseOperator {
+  class NoInputPortOperator extends BaseOperator
+  {
   }
 
   @Test
-  public void testValidationForNonInputRootOperator() {
+  public void testValidationForNonInputRootOperator()
+  {
     LogicalPlan dag = new LogicalPlan();
     NoInputPortOperator x = dag.addOperator("x", new NoInputPortOperator());
     try {
@@ -374,8 +412,8 @@ public class LogicalPlanTest {
   }
 
   @OperatorAnnotation(partitionable = false)
-  public static class TestOperatorAnnotationOperator2 extends BaseOperator implements Partitioner<TestOperatorAnnotationOperator2> {
-
+  public static class TestOperatorAnnotationOperator2 extends BaseOperator implements Partitioner<TestOperatorAnnotationOperator2>
+  {
     @Override
     public Collection<Partition<TestOperatorAnnotationOperator2>> definePartitions(Collection<Partition<TestOperatorAnnotationOperator2>> partitions, PartitioningContext context)
     {
@@ -389,7 +427,8 @@ public class LogicalPlanTest {
   }
 
   @Test
-  public void testOperatorAnnotation() {
+  public void testOperatorAnnotation()
+  {
     LogicalPlan dag = new LogicalPlan();
     TestGeneratorInputOperator input = dag.addOperator("input1", TestGeneratorInputOperator.class);
     TestOperatorAnnotationOperator operator = dag.addOperator("operator1", TestOperatorAnnotationOperator.class);
@@ -430,8 +469,8 @@ public class LogicalPlanTest {
   }
 
   @Test
-  public void testPortConnectionValidation() {
-
+  public void testPortConnectionValidation()
+  {
     LogicalPlan dag = new LogicalPlan();
 
     TestNonOptionalOutportInputOperator input = dag.addOperator("input1", TestNonOptionalOutportInputOperator.class);
@@ -459,7 +498,8 @@ public class LogicalPlanTest {
   }
 
   @Test
-  public void testAtMostOnceProcessingModeValidation() {
+  public void testAtMostOnceProcessingModeValidation()
+  {
     LogicalPlan dag = new LogicalPlan();
 
     TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
@@ -489,8 +529,9 @@ public class LogicalPlanTest {
 
   }
 
-    @Test
-  public void testExactlyOnceProcessingModeValidation() {
+  @Test
+  public void testExactlyOnceProcessingModeValidation()
+  {
     LogicalPlan dag = new LogicalPlan();
 
     TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
@@ -527,7 +568,8 @@ public class LogicalPlanTest {
   }
 
   @Test
-  public void testLocalityValidation() {
+  public void testLocalityValidation()
+  {
     LogicalPlan dag = new LogicalPlan();
 
     TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
@@ -549,7 +591,8 @@ public class LogicalPlanTest {
     dag.validate();
   }
 
-  private class TestAnnotationsOperator extends BaseOperator implements InputOperator {
+  private class TestAnnotationsOperator extends BaseOperator implements InputOperator
+  {
     //final public transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>();
 
     @OutputPortFieldAnnotation( optional=false)
@@ -562,7 +605,8 @@ public class LogicalPlanTest {
     }
   }
 
-  private class TestAnnotationsOperator2 extends BaseOperator implements InputOperator{
+  private class TestAnnotationsOperator2 extends BaseOperator implements InputOperator
+  {
     // multiple ports w/o annotation, one of them must be connected
     final public transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>();
 
@@ -573,7 +617,8 @@ public class LogicalPlanTest {
     }
   }
 
-  private class TestAnnotationsOperator3 extends BaseOperator implements InputOperator{
+  private class TestAnnotationsOperator3 extends BaseOperator implements InputOperator
+  {
     // multiple ports w/o annotation, one of them must be connected
     @OutputPortFieldAnnotation( optional=true)
     final public transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>();
@@ -587,7 +632,8 @@ public class LogicalPlanTest {
   }
 
   @Test
-  public void testOutputPortAnnotation() {
+  public void testOutputPortAnnotation()
+  {
     LogicalPlan dag = new LogicalPlan();
     TestAnnotationsOperator ta1 = dag.addOperator("testAnnotationsOperator", new TestAnnotationsOperator());
 
@@ -623,7 +669,8 @@ public class LogicalPlanTest {
    * Operator that can be used with default Java serialization instead of Kryo
    */
   @DefaultSerializer(JavaSerializer.class)
-  public static class JdkSerializableOperator extends BaseOperator implements Serializable {
+  public static class JdkSerializableOperator extends BaseOperator implements Serializable
+  {
     private static final long serialVersionUID = -4024202339520027097L;
 
     public abstract class SerializableInputPort<T> implements InputPort<T>, Sink<T>, java.io.Serializable {
@@ -673,7 +720,8 @@ public class LogicalPlanTest {
   }
 
   @Test
-  public void testJdkSerializableOperator() throws Exception {
+  public void testJdkSerializableOperator() throws Exception
+  {
     LogicalPlan dag = new LogicalPlan();
     dag.addOperator("o1", new JdkSerializableOperator());
 
@@ -785,7 +833,8 @@ public class LogicalPlanTest {
     }
   }
 
-  public static class TestPortCodecOperator extends BaseOperator {
+  public static class TestPortCodecOperator extends BaseOperator
+  {
     public transient final DefaultInputPort<Object> inport1 = new DefaultInputPort<Object>()
     {
       @Override