You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2017/03/02 21:12:21 UTC

apex-core git commit: APEXCORE-426 Reuse the running container, when the Stram restarts.

Repository: apex-core
Updated Branches:
  refs/heads/master 58930cc57 -> 3b660c9c1


APEXCORE-426 Reuse the running container, when the Stram restarts.


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/3b660c9c
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/3b660c9c
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/3b660c9c

Branch: refs/heads/master
Commit: 3b660c9c1dd639c49ae300fd3f70397d3f794d12
Parents: 58930cc
Author: Sandesh Hegde <sa...@gmail.com>
Authored: Fri Nov 4 17:31:59 2016 -0700
Committer: Sandesh Hegde <sa...@gmail.com>
Committed: Thu Mar 2 13:04:33 2017 -0800

----------------------------------------------------------------------
 .../java/com/datatorrent/stram/StramClient.java |  2 +
 .../datatorrent/stram/StramLocalCluster.java    |  2 +-
 .../stram/StreamingAppMasterService.java        | 38 +++++++------------
 .../stram/StreamingContainerManager.java        | 16 ++++----
 .../com/datatorrent/stram/CheckpointTest.java   | 39 +++++++++-----------
 .../stram/StramLocalClusterTest.java            |  2 +-
 .../datatorrent/stram/StramRecoveryTest.java    |  2 +-
 .../stram/StreamingContainerManagerTest.java    |  8 ++--
 .../stram/plan/logical/DelayOperatorTest.java   |  2 +-
 9 files changed, 50 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/3b660c9c/engine/src/main/java/com/datatorrent/stram/StramClient.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java
index 8b78c14..dad42e3 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java
@@ -392,6 +392,8 @@ public class StramClient
       //appContext.setMaxAppAttempts(1); // no retries until Stram is HA
     }
 
+    appContext.setKeepContainersAcrossApplicationAttempts(true);
+
     // Set up the container launch context for the application master
     ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
 

http://git-wip-us.apache.org/repos/asf/apex-core/blob/3b660c9c/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
index e5d855b..ff61868 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
@@ -501,7 +501,7 @@ public class StramLocalCluster implements Runnable, Controller
 
       if (heartbeatMonitoringEnabled) {
         // monitor child containers
-        dnmgr.monitorHeartbeat();
+        dnmgr.monitorHeartbeat(false);
       }
 
       if (childContainers.isEmpty() && dnmgr.containerStartRequests.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/3b660c9c/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index 3898dbc..c0e09ab 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -27,7 +27,6 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -63,9 +62,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.client.api.YarnClient;
@@ -75,7 +71,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.webapp.WebApp;
@@ -740,9 +735,12 @@ public class StreamingAppMasterService extends CompositeService
       clientRMService.stop();
     }
 
-    // check for previously allocated containers
-    // as of 2.2, containers won't survive AM restart, but this will change in the future - YARN-1490
-    checkContainerStatus();
+    List<Container> containers = response.getContainersFromPreviousAttempts();
+
+    // Running containers might take a while to register with the new app master and send the heartbeat signal.
+    int waitForRecovery = containers.size() > 0 ? dag.getValue(LogicalPlan.HEARTBEAT_TIMEOUT_MILLIS) / 1000 : 0;
+
+    previouslyAllocatedContainers(containers);
     FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
     final InetSocketAddress rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
         YarnConfiguration.DEFAULT_RM_ADDRESS,
@@ -1028,7 +1026,9 @@ public class StreamingAppMasterService extends CompositeService
           loopCounter, appDone, numRequestedContainers, numReleasedContainers, numCompletedContainers, numFailedContainers, allocatedContainers.size(), dnmgr.containerStartRequests);
 
       // monitor child containers
-      dnmgr.monitorHeartbeat();
+      dnmgr.monitorHeartbeat(waitForRecovery > 0);
+
+      waitForRecovery = Math.max(waitForRecovery - 1, 0);
     }
 
     finishApplication(finalStatus);
@@ -1068,22 +1068,12 @@ public class StreamingAppMasterService extends CompositeService
    * Check for containers that were allocated in a previous attempt.
    * If the containers are still alive, wait for them to check in via heartbeat.
    */
-  private void checkContainerStatus()
+  private void previouslyAllocatedContainers(List<Container> containers)
   {
-    Collection<StreamingContainerAgent> containers = this.dnmgr.getContainerAgents();
-    for (StreamingContainerAgent ca : containers) {
-      ContainerId containerId = ConverterUtils.toContainerId(ca.container.getExternalId());
-      NodeId nodeId = ConverterUtils.toNodeId(ca.container.host);
-
-      // put container back into the allocated list
-      org.apache.hadoop.yarn.api.records.Token containerToken = null;
-      Resource resource = Resource.newInstance(ca.container.getAllocatedMemoryMB(), ca.container.getAllocatedVCores());
-      Priority priority = Priority.newInstance(ca.container.getResourceRequestPriority());
-      Container yarnContainer = Container.newInstance(containerId, nodeId, ca.container.nodeHttpAddress, resource, priority, containerToken);
-      this.allocatedContainers.put(containerId.toString(), new AllocatedContainer(yarnContainer));
-
-      // check the status
-      nmClient.getContainerStatusAsync(containerId, nodeId);
+    for (Container container : containers) {
+      this.allocatedContainers.put(container.getId().toString(), new AllocatedContainer(container));
+      //check the status
+      nmClient.getContainerStatusAsync(container.getId(), container.getNodeId());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/apex-core/blob/3b660c9c/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index dfbc7d1..fe27be9 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -749,7 +749,7 @@ public class StreamingContainerManager implements PlanContext
    * Check periodically that deployed containers phone home.
    * Run from the master main loop (single threaded access).
    */
-  public void monitorHeartbeat()
+  public void monitorHeartbeat(boolean waitForRecovery)
   {
     long currentTms = clock.getTime();
 
@@ -798,7 +798,7 @@ public class StreamingContainerManager implements PlanContext
     // events that may modify the plan
     processEvents();
 
-    committedWindowId = updateCheckpoints(false);
+    committedWindowId = updateCheckpoints(waitForRecovery);
     calculateEndWindowStats();
     if (this.vars.enableStatsRecording) {
       recordStats(currentTms);
@@ -1138,7 +1138,7 @@ public class StreamingContainerManager implements PlanContext
     // resolve dependencies
     UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock, false, getCheckpointGroups());
     for (PTOperator oper : cs.container.getOperators()) {
-      updateRecoveryCheckpoints(oper, ctx);
+      updateRecoveryCheckpoints(oper, ctx, false);
     }
     includeLocalUpstreamOperators(ctx);
 
@@ -1170,7 +1170,7 @@ public class StreamingContainerManager implements PlanContext
       }
       if (!newOperators.isEmpty()) {
         for (PTOperator oper : newOperators) {
-          updateRecoveryCheckpoints(oper, ctx);
+          updateRecoveryCheckpoints(oper, ctx, false);
         }
       }
     } while (!newOperators.isEmpty());
@@ -2022,7 +2022,7 @@ public class StreamingContainerManager implements PlanContext
    * @param operator Operator instance for which to find recovery checkpoint
    * @param ctx      Context into which to collect traversal info
    */
-  public void updateRecoveryCheckpoints(PTOperator operator, UpdateCheckpointsContext ctx)
+  public void updateRecoveryCheckpoints(PTOperator operator, UpdateCheckpointsContext ctx, boolean recovery)
   {
     if (operator.getRecoveryCheckpoint().windowId < ctx.committedWindowId.longValue()) {
       ctx.committedWindowId.setValue(operator.getRecoveryCheckpoint().windowId);
@@ -2031,7 +2031,7 @@ public class StreamingContainerManager implements PlanContext
     if (operator.getState() == PTOperator.State.ACTIVE &&
         (ctx.currentTms - operator.stats.lastWindowIdChangeTms) > operator.stats.windowProcessingTimeoutMillis) {
       // if the checkpoint is ahead, then it is not blocked but waiting for activation (state-less recovery, at-most-once)
-      if (ctx.committedWindowId.longValue() >= operator.getRecoveryCheckpoint().windowId) {
+      if (ctx.committedWindowId.longValue() >= operator.getRecoveryCheckpoint().windowId && !recovery) {
         LOG.warn("Marking operator {} blocked committed window {}, recovery window {}, current time {}, last window id change time {}, window processing timeout millis {}",
             operator,
             Codec.getStringWindowId(ctx.committedWindowId.longValue()),
@@ -2096,7 +2096,7 @@ public class StreamingContainerManager implements PlanContext
           }
           if (!ctx.visited.contains(sinkOperator)) {
             // downstream traversal
-            updateRecoveryCheckpoints(sinkOperator, ctx);
+            updateRecoveryCheckpoints(sinkOperator, ctx, recovery);
           }
           // recovery window id cannot move backwards
           // when dynamically adding new operators
@@ -2196,7 +2196,7 @@ public class StreamingContainerManager implements PlanContext
       if (operators != null) {
         for (PTOperator operator : operators) {
           operatorCount++;
-          updateRecoveryCheckpoints(operator, ctx);
+          updateRecoveryCheckpoints(operator, ctx, recovery);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/3b660c9c/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
index db939cd..d7f96d4 100644
--- a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
@@ -216,11 +216,11 @@ public class CheckpointTest
       Assert.assertEquals("", PTOperator.State.PENDING_DEPLOY, oper.getState());
     }
 
-    dnm.updateRecoveryCheckpoints(o2p1, new UpdateCheckpointsContext(clock));
+    dnm.updateRecoveryCheckpoints(o2p1, new UpdateCheckpointsContext(clock), false);
     Assert.assertEquals("no checkpoints " + o2p1, Checkpoint.INITIAL_CHECKPOINT, o2p1.getRecoveryCheckpoint());
 
     UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock);
-    dnm.updateRecoveryCheckpoints(o1p1, ctx);
+    dnm.updateRecoveryCheckpoints(o1p1, ctx, false);
     Assert.assertEquals("no checkpoints " + o1p1, Checkpoint.INITIAL_CHECKPOINT, o1p1.getRecoveryCheckpoint());
     Assert.assertEquals("number dependencies " + ctx.visited, 3, ctx.visited.size());
 
@@ -231,17 +231,17 @@ public class CheckpointTest
 
     o1p1.checkpoints.add(cp3);
     o1p1.checkpoints.add(cp5);
-    dnm.updateRecoveryCheckpoints(o1p1, new UpdateCheckpointsContext(clock));
+    dnm.updateRecoveryCheckpoints(o1p1, new UpdateCheckpointsContext(clock), false);
     Assert.assertEquals("checkpoint " + o1p1, Checkpoint.INITIAL_CHECKPOINT, o1p1.getRecoveryCheckpoint());
 
     o2p1.checkpoints.add(new Checkpoint(3L, 0, 0));
-    dnm.updateRecoveryCheckpoints(o1p1, new UpdateCheckpointsContext(clock));
+    dnm.updateRecoveryCheckpoints(o1p1, new UpdateCheckpointsContext(clock), false);
     Assert.assertEquals("checkpoint " + o1p1, Checkpoint.INITIAL_CHECKPOINT, o1p1.getRecoveryCheckpoint());
     Assert.assertEquals("checkpoint " + o2p1, Checkpoint.INITIAL_CHECKPOINT, o2p1.getRecoveryCheckpoint());
 
     // set leaf operator checkpoint
     dnm.addCheckpoint(o3SLp1, cp5);
-    dnm.updateRecoveryCheckpoints(o1p1, new UpdateCheckpointsContext(clock));
+    dnm.updateRecoveryCheckpoints(o1p1, new UpdateCheckpointsContext(clock), false);
     Assert.assertEquals("checkpoint " + o1p1, Checkpoint.INITIAL_CHECKPOINT, o1p1.getRecoveryCheckpoint());
     Assert.assertEquals("checkpoint " + o2p1, Checkpoint.INITIAL_CHECKPOINT, o2p1.getRecoveryCheckpoint());
 
@@ -249,20 +249,20 @@ public class CheckpointTest
     for (PTOperator oper : plan.getAllOperators().values()) {
       oper.setState(PTOperator.State.ACTIVE);
     }
-    dnm.updateRecoveryCheckpoints(o1p1, new UpdateCheckpointsContext(clock));
+    dnm.updateRecoveryCheckpoints(o1p1, new UpdateCheckpointsContext(clock), false);
     Assert.assertEquals("checkpoint " + o1p1, cp3, o1p1.getRecoveryCheckpoint());
     Assert.assertEquals("checkpoint " + o2p1, cp3, o1p1.getRecoveryCheckpoint());
     Assert.assertEquals("checkpoint " + o3SLp1, cp5, o3SLp1.getRecoveryCheckpoint());
     Assert.assertNull("checkpoint null for stateless operator " + o3SLp1, o3SLp1.stats.checkpointStats);
 
     o2p1.checkpoints.add(cp4);
-    dnm.updateRecoveryCheckpoints(o1p1, new UpdateCheckpointsContext(clock));
+    dnm.updateRecoveryCheckpoints(o1p1, new UpdateCheckpointsContext(clock), false);
     Assert.assertEquals("checkpoint " + o1p1, cp3, o1p1.getRecoveryCheckpoint());
     Assert.assertEquals("checkpoint " + o2p1, cp4, o2p1.getRecoveryCheckpoint());
 
     o1p1.checkpoints.add(1, cp4);
     Assert.assertEquals(o1p1.checkpoints, getCheckpoints(3L, 4L, 5L));
-    dnm.updateRecoveryCheckpoints(o1p1, new UpdateCheckpointsContext(clock));
+    dnm.updateRecoveryCheckpoints(o1p1, new UpdateCheckpointsContext(clock), false);
     Assert.assertEquals("checkpoint " + o1p1, cp4, o1p1.getRecoveryCheckpoint());
     Assert.assertEquals(o1p1.checkpoints, getCheckpoints(4L, 5L));
 
@@ -315,7 +315,7 @@ public class CheckpointTest
     o4p1.checkpoints.add(leafCheckpoint);
 
     UpdateCheckpointsContext ctx;
-    dnm.updateRecoveryCheckpoints(o1p1, ctx = new UpdateCheckpointsContext(clock, true, Collections.<OperatorMeta, Set<OperatorMeta>>emptyMap()));
+    dnm.updateRecoveryCheckpoints(o1p1, ctx = new UpdateCheckpointsContext(clock, true, Collections.<OperatorMeta, Set<OperatorMeta>>emptyMap()), false);
     Assert.assertEquals("initial checkpoint " + o1p1, Checkpoint.INITIAL_CHECKPOINT, o1p1.getRecoveryCheckpoint());
     Assert.assertEquals("initial checkpoint " + o2SLp1, leafCheckpoint, o2SLp1.getRecoveryCheckpoint());
     Assert.assertEquals("initial checkpoint " + o3SLp1, new Checkpoint(clock.getTime(), 0, 0), o3SLp1.getRecoveryCheckpoint());
@@ -376,7 +376,7 @@ public class CheckpointTest
     PTOperator o2p1 = partitions.get(0);
 
     UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock);
-    dnm.updateRecoveryCheckpoints(o1p1, ctx);
+    dnm.updateRecoveryCheckpoints(o1p1, ctx, false);
     Assert.assertTrue("no blocked operators", ctx.blocked.isEmpty());
 
     o1p1.stats.statsRevs.checkout();
@@ -387,25 +387,25 @@ public class CheckpointTest
     clock.time = o1p1.stats.windowProcessingTimeoutMillis + 1;
 
     ctx = new UpdateCheckpointsContext(clock);
-    dnm.updateRecoveryCheckpoints(o1p1, ctx);
+    dnm.updateRecoveryCheckpoints(o1p1, ctx, false);
     Assert.assertEquals("o2 blocked", Sets.newHashSet(o2p1), ctx.blocked);
 
     // assign future activation window (state-less or at-most-once).
     Checkpoint cp2 = o2p1.getRecoveryCheckpoint();
     o2p1.setRecoveryCheckpoint(new Checkpoint(o1p1.getRecoveryCheckpoint().windowId + 1, cp2.applicationWindowCount, cp2.checkpointWindowCount));
     ctx = new UpdateCheckpointsContext(clock);
-    dnm.updateRecoveryCheckpoints(o1p1, ctx);
+    dnm.updateRecoveryCheckpoints(o1p1, ctx, false);
     Assert.assertEquals("no operators blocked (o2 activation window ahead)", Sets.newHashSet(), ctx.blocked);
 
     // reset to blocked
     o2p1.setRecoveryCheckpoint(cp2);
     ctx = new UpdateCheckpointsContext(clock);
-    dnm.updateRecoveryCheckpoints(o1p1, ctx);
+    dnm.updateRecoveryCheckpoints(o1p1, ctx, false);
     Assert.assertEquals("o2 blocked", Sets.newHashSet(o2p1), ctx.blocked);
 
     clock.time++;
     ctx = new UpdateCheckpointsContext(clock);
-    dnm.updateRecoveryCheckpoints(o1p1, ctx);
+    dnm.updateRecoveryCheckpoints(o1p1, ctx, false);
     Assert.assertEquals("operators blocked", Sets.newHashSet(o1p1, o2p1), ctx.blocked);
 
     o2p1.stats.statsRevs.checkout();
@@ -413,16 +413,13 @@ public class CheckpointTest
     o2p1.stats.statsRevs.commit();
 
     ctx = new UpdateCheckpointsContext(clock);
-    dnm.updateRecoveryCheckpoints(o1p1, ctx);
+    dnm.updateRecoveryCheckpoints(o1p1, ctx, false);
     Assert.assertEquals("operators blocked", Sets.newHashSet(o1p1), ctx.blocked);
 
     clock.time--;
     ctx = new UpdateCheckpointsContext(clock);
-    dnm.updateRecoveryCheckpoints(o1p1, ctx);
+    dnm.updateRecoveryCheckpoints(o1p1, ctx, false);
     Assert.assertEquals("operators blocked", Sets.newHashSet(), ctx.blocked);
-
-
-
   }
 
   @Test
@@ -473,13 +470,13 @@ public class CheckpointTest
     mc1.sendHeartbeat();
     Assert.assertEquals(PTOperator.State.ACTIVE, o1p1.getState());
     Assert.assertEquals(10, o1p1.stats.lastWindowIdChangeTms);
-    scm.monitorHeartbeat();
+    scm.monitorHeartbeat(false);
     Assert.assertTrue(scm.containerStopRequests.isEmpty());
 
     clock.time++;
     mc1.sendHeartbeat();
     Assert.assertEquals(PTOperator.State.ACTIVE, o1p1.getState());
-    scm.monitorHeartbeat();
+    scm.monitorHeartbeat(false);
     Assert.assertTrue(scm.containerStopRequests.containsKey(o1p1.getContainer().getExternalId()));
 
   }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/3b660c9c/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java b/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
index 5bea0b3..56641f8 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
@@ -280,7 +280,7 @@ public class StramLocalClusterTest
     c2.waitForHeartbeat(5000);
 
     // purge checkpoints
-    localCluster.dnmgr.monitorHeartbeat(); // checkpoint purging
+    localCluster.dnmgr.monitorHeartbeat(false); // checkpoint purging
 
     Assert.assertEquals("checkpoints " + ptNode1, Arrays.asList(new Checkpoint[] {new Checkpoint(3L, 0, 0)}), ptNode1.checkpoints);
     Assert.assertEquals("checkpoints " + ptNode2, Arrays.asList(new Checkpoint[] {new Checkpoint(3L, 0, 0)}), ptNode2.checkpoints);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/3b660c9c/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
index e8ec26c..3c25096 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
@@ -241,7 +241,7 @@ public class StramRecoveryTest
     csr.setSinkOperatorPortName("inport1");
     FutureTask<?> lpmf = scm.logicalPlanModification(Lists.newArrayList(cor, csr));
     while (!lpmf.isDone()) {
-      scm.monitorHeartbeat();
+      scm.monitorHeartbeat(false);
     }
     Assert.assertNull(lpmf.get()); // unmask exception, if any
 

http://git-wip-us.apache.org/repos/asf/apex-core/blob/3b660c9c/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
index 2d86618..84622c4 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
@@ -769,9 +769,9 @@ public class StreamingContainerManagerTest
 
     o1p1mos.currentWindowId(2).deployState(DeployState.SHUTDOWN);
     mc1.sendHeartbeat();
-    scm.monitorHeartbeat();
+    scm.monitorHeartbeat(false);
     Assert.assertEquals("committedWindowId", -1, scm.getCommittedWindowId());
-    scm.monitorHeartbeat(); // committedWindowId updated in next cycle
+    scm.monitorHeartbeat(false); // committedWindowId updated in next cycle
     Assert.assertEquals("committedWindowId", 1, scm.getCommittedWindowId());
     scm.processEvents();
     Assert.assertEquals("containers at committedWindowId=1", 4, physicalPlan.getContainers().size());
@@ -779,7 +779,7 @@ public class StreamingContainerManagerTest
     // checkpoint window 2
     o1p1mos.checkpointWindowId(2);
     mc1.sendHeartbeat();
-    scm.monitorHeartbeat();
+    scm.monitorHeartbeat(false);
 
     Assert.assertEquals("committedWindowId", 1, scm.getCommittedWindowId());
 
@@ -791,7 +791,7 @@ public class StreamingContainerManagerTest
     mc2.sendHeartbeat();
     mc3.sendHeartbeat();
     mc4.sendHeartbeat();
-    scm.monitorHeartbeat();
+    scm.monitorHeartbeat(false);
 
     // Operators are shutdown when both operators reach window Id 2
     Assert.assertEquals(0, o1p1.getContainer().getOperators().size());

http://git-wip-us.apache.org/repos/asf/apex-core/blob/3b660c9c/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
index 821f4ea..285aba3 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
@@ -456,7 +456,7 @@ public class DelayOperatorTest
     }
 
     UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock, false, groups);
-    scm.updateRecoveryCheckpoints(opB1, ctx);
+    scm.updateRecoveryCheckpoints(opB1, ctx, false);
 
     Assert.assertEquals("checkpoint " + opA1, Checkpoint.INITIAL_CHECKPOINT, opA1.getRecoveryCheckpoint());
     Assert.assertEquals("checkpoint " + opB1, cp3, opC1.getRecoveryCheckpoint());