You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2017/03/02 21:12:21 UTC
apex-core git commit: APEXCORE-426 Reuse the running container,
when the Stram restarts.
Repository: apex-core
Updated Branches:
refs/heads/master 58930cc57 -> 3b660c9c1
APEXCORE-426 Reuse the running container, when the Stram restarts.
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/3b660c9c
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/3b660c9c
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/3b660c9c
Branch: refs/heads/master
Commit: 3b660c9c1dd639c49ae300fd3f70397d3f794d12
Parents: 58930cc
Author: Sandesh Hegde <sa...@gmail.com>
Authored: Fri Nov 4 17:31:59 2016 -0700
Committer: Sandesh Hegde <sa...@gmail.com>
Committed: Thu Mar 2 13:04:33 2017 -0800
----------------------------------------------------------------------
.../java/com/datatorrent/stram/StramClient.java | 2 +
.../datatorrent/stram/StramLocalCluster.java | 2 +-
.../stram/StreamingAppMasterService.java | 38 +++++++------------
.../stram/StreamingContainerManager.java | 16 ++++----
.../com/datatorrent/stram/CheckpointTest.java | 39 +++++++++-----------
.../stram/StramLocalClusterTest.java | 2 +-
.../datatorrent/stram/StramRecoveryTest.java | 2 +-
.../stram/StreamingContainerManagerTest.java | 8 ++--
.../stram/plan/logical/DelayOperatorTest.java | 2 +-
9 files changed, 50 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/3b660c9c/engine/src/main/java/com/datatorrent/stram/StramClient.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java
index 8b78c14..dad42e3 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java
@@ -392,6 +392,8 @@ public class StramClient
//appContext.setMaxAppAttempts(1); // no retries until Stram is HA
}
+ appContext.setKeepContainersAcrossApplicationAttempts(true);
+
// Set up the container launch context for the application master
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
http://git-wip-us.apache.org/repos/asf/apex-core/blob/3b660c9c/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
index e5d855b..ff61868 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
@@ -501,7 +501,7 @@ public class StramLocalCluster implements Runnable, Controller
if (heartbeatMonitoringEnabled) {
// monitor child containers
- dnmgr.monitorHeartbeat();
+ dnmgr.monitorHeartbeat(false);
}
if (childContainers.isEmpty() && dnmgr.containerStartRequests.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/apex-core/blob/3b660c9c/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index 3898dbc..c0e09ab 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -27,7 +27,6 @@ import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -63,9 +62,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.YarnClient;
@@ -75,7 +71,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.webapp.WebApp;
@@ -740,9 +735,12 @@ public class StreamingAppMasterService extends CompositeService
clientRMService.stop();
}
- // check for previously allocated containers
- // as of 2.2, containers won't survive AM restart, but this will change in the future - YARN-1490
- checkContainerStatus();
+ List<Container> containers = response.getContainersFromPreviousAttempts();
+
+ // Running containers might take a while to register with the new app master and send the heartbeat signal.
+ int waitForRecovery = containers.size() > 0 ? dag.getValue(LogicalPlan.HEARTBEAT_TIMEOUT_MILLIS) / 1000 : 0;
+
+ previouslyAllocatedContainers(containers);
FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
final InetSocketAddress rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS,
@@ -1028,7 +1026,9 @@ public class StreamingAppMasterService extends CompositeService
loopCounter, appDone, numRequestedContainers, numReleasedContainers, numCompletedContainers, numFailedContainers, allocatedContainers.size(), dnmgr.containerStartRequests);
// monitor child containers
- dnmgr.monitorHeartbeat();
+ dnmgr.monitorHeartbeat(waitForRecovery > 0);
+
+ waitForRecovery = Math.max(waitForRecovery - 1, 0);
}
finishApplication(finalStatus);
@@ -1068,22 +1068,12 @@ public class StreamingAppMasterService extends CompositeService
* Check for containers that were allocated in a previous attempt.
* If the containers are still alive, wait for them to check in via heartbeat.
*/
- private void checkContainerStatus()
+ private void previouslyAllocatedContainers(List<Container> containers)
{
- Collection<StreamingContainerAgent> containers = this.dnmgr.getContainerAgents();
- for (StreamingContainerAgent ca : containers) {
- ContainerId containerId = ConverterUtils.toContainerId(ca.container.getExternalId());
- NodeId nodeId = ConverterUtils.toNodeId(ca.container.host);
-
- // put container back into the allocated list
- org.apache.hadoop.yarn.api.records.Token containerToken = null;
- Resource resource = Resource.newInstance(ca.container.getAllocatedMemoryMB(), ca.container.getAllocatedVCores());
- Priority priority = Priority.newInstance(ca.container.getResourceRequestPriority());
- Container yarnContainer = Container.newInstance(containerId, nodeId, ca.container.nodeHttpAddress, resource, priority, containerToken);
- this.allocatedContainers.put(containerId.toString(), new AllocatedContainer(yarnContainer));
-
- // check the status
- nmClient.getContainerStatusAsync(containerId, nodeId);
+ for (Container container : containers) {
+ this.allocatedContainers.put(container.getId().toString(), new AllocatedContainer(container));
+ //check the status
+ nmClient.getContainerStatusAsync(container.getId(), container.getNodeId());
}
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/3b660c9c/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index dfbc7d1..fe27be9 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -749,7 +749,7 @@ public class StreamingContainerManager implements PlanContext
* Check periodically that deployed containers phone home.
* Run from the master main loop (single threaded access).
*/
- public void monitorHeartbeat()
+ public void monitorHeartbeat(boolean waitForRecovery)
{
long currentTms = clock.getTime();
@@ -798,7 +798,7 @@ public class StreamingContainerManager implements PlanContext
// events that may modify the plan
processEvents();
- committedWindowId = updateCheckpoints(false);
+ committedWindowId = updateCheckpoints(waitForRecovery);
calculateEndWindowStats();
if (this.vars.enableStatsRecording) {
recordStats(currentTms);
@@ -1138,7 +1138,7 @@ public class StreamingContainerManager implements PlanContext
// resolve dependencies
UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock, false, getCheckpointGroups());
for (PTOperator oper : cs.container.getOperators()) {
- updateRecoveryCheckpoints(oper, ctx);
+ updateRecoveryCheckpoints(oper, ctx, false);
}
includeLocalUpstreamOperators(ctx);
@@ -1170,7 +1170,7 @@ public class StreamingContainerManager implements PlanContext
}
if (!newOperators.isEmpty()) {
for (PTOperator oper : newOperators) {
- updateRecoveryCheckpoints(oper, ctx);
+ updateRecoveryCheckpoints(oper, ctx, false);
}
}
} while (!newOperators.isEmpty());
@@ -2022,7 +2022,7 @@ public class StreamingContainerManager implements PlanContext
* @param operator Operator instance for which to find recovery checkpoint
* @param ctx Context into which to collect traversal info
*/
- public void updateRecoveryCheckpoints(PTOperator operator, UpdateCheckpointsContext ctx)
+ public void updateRecoveryCheckpoints(PTOperator operator, UpdateCheckpointsContext ctx, boolean recovery)
{
if (operator.getRecoveryCheckpoint().windowId < ctx.committedWindowId.longValue()) {
ctx.committedWindowId.setValue(operator.getRecoveryCheckpoint().windowId);
@@ -2031,7 +2031,7 @@ public class StreamingContainerManager implements PlanContext
if (operator.getState() == PTOperator.State.ACTIVE &&
(ctx.currentTms - operator.stats.lastWindowIdChangeTms) > operator.stats.windowProcessingTimeoutMillis) {
// if the checkpoint is ahead, then it is not blocked but waiting for activation (state-less recovery, at-most-once)
- if (ctx.committedWindowId.longValue() >= operator.getRecoveryCheckpoint().windowId) {
+ if (ctx.committedWindowId.longValue() >= operator.getRecoveryCheckpoint().windowId && !recovery) {
LOG.warn("Marking operator {} blocked committed window {}, recovery window {}, current time {}, last window id change time {}, window processing timeout millis {}",
operator,
Codec.getStringWindowId(ctx.committedWindowId.longValue()),
@@ -2096,7 +2096,7 @@ public class StreamingContainerManager implements PlanContext
}
if (!ctx.visited.contains(sinkOperator)) {
// downstream traversal
- updateRecoveryCheckpoints(sinkOperator, ctx);
+ updateRecoveryCheckpoints(sinkOperator, ctx, recovery);
}
// recovery window id cannot move backwards
// when dynamically adding new operators
@@ -2196,7 +2196,7 @@ public class StreamingContainerManager implements PlanContext
if (operators != null) {
for (PTOperator operator : operators) {
operatorCount++;
- updateRecoveryCheckpoints(operator, ctx);
+ updateRecoveryCheckpoints(operator, ctx, recovery);
}
}
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/3b660c9c/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
index db939cd..d7f96d4 100644
--- a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
@@ -216,11 +216,11 @@ public class CheckpointTest
Assert.assertEquals("", PTOperator.State.PENDING_DEPLOY, oper.getState());
}
- dnm.updateRecoveryCheckpoints(o2p1, new UpdateCheckpointsContext(clock));
+ dnm.updateRecoveryCheckpoints(o2p1, new UpdateCheckpointsContext(clock), false);
Assert.assertEquals("no checkpoints " + o2p1, Checkpoint.INITIAL_CHECKPOINT, o2p1.getRecoveryCheckpoint());
UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock);
- dnm.updateRecoveryCheckpoints(o1p1, ctx);
+ dnm.updateRecoveryCheckpoints(o1p1, ctx, false);
Assert.assertEquals("no checkpoints " + o1p1, Checkpoint.INITIAL_CHECKPOINT, o1p1.getRecoveryCheckpoint());
Assert.assertEquals("number dependencies " + ctx.visited, 3, ctx.visited.size());
@@ -231,17 +231,17 @@ public class CheckpointTest
o1p1.checkpoints.add(cp3);
o1p1.checkpoints.add(cp5);
- dnm.updateRecoveryCheckpoints(o1p1, new UpdateCheckpointsContext(clock));
+ dnm.updateRecoveryCheckpoints(o1p1, new UpdateCheckpointsContext(clock), false);
Assert.assertEquals("checkpoint " + o1p1, Checkpoint.INITIAL_CHECKPOINT, o1p1.getRecoveryCheckpoint());
o2p1.checkpoints.add(new Checkpoint(3L, 0, 0));
- dnm.updateRecoveryCheckpoints(o1p1, new UpdateCheckpointsContext(clock));
+ dnm.updateRecoveryCheckpoints(o1p1, new UpdateCheckpointsContext(clock), false);
Assert.assertEquals("checkpoint " + o1p1, Checkpoint.INITIAL_CHECKPOINT, o1p1.getRecoveryCheckpoint());
Assert.assertEquals("checkpoint " + o2p1, Checkpoint.INITIAL_CHECKPOINT, o2p1.getRecoveryCheckpoint());
// set leaf operator checkpoint
dnm.addCheckpoint(o3SLp1, cp5);
- dnm.updateRecoveryCheckpoints(o1p1, new UpdateCheckpointsContext(clock));
+ dnm.updateRecoveryCheckpoints(o1p1, new UpdateCheckpointsContext(clock), false);
Assert.assertEquals("checkpoint " + o1p1, Checkpoint.INITIAL_CHECKPOINT, o1p1.getRecoveryCheckpoint());
Assert.assertEquals("checkpoint " + o2p1, Checkpoint.INITIAL_CHECKPOINT, o2p1.getRecoveryCheckpoint());
@@ -249,20 +249,20 @@ public class CheckpointTest
for (PTOperator oper : plan.getAllOperators().values()) {
oper.setState(PTOperator.State.ACTIVE);
}
- dnm.updateRecoveryCheckpoints(o1p1, new UpdateCheckpointsContext(clock));
+ dnm.updateRecoveryCheckpoints(o1p1, new UpdateCheckpointsContext(clock), false);
Assert.assertEquals("checkpoint " + o1p1, cp3, o1p1.getRecoveryCheckpoint());
Assert.assertEquals("checkpoint " + o2p1, cp3, o1p1.getRecoveryCheckpoint());
Assert.assertEquals("checkpoint " + o3SLp1, cp5, o3SLp1.getRecoveryCheckpoint());
Assert.assertNull("checkpoint null for stateless operator " + o3SLp1, o3SLp1.stats.checkpointStats);
o2p1.checkpoints.add(cp4);
- dnm.updateRecoveryCheckpoints(o1p1, new UpdateCheckpointsContext(clock));
+ dnm.updateRecoveryCheckpoints(o1p1, new UpdateCheckpointsContext(clock), false);
Assert.assertEquals("checkpoint " + o1p1, cp3, o1p1.getRecoveryCheckpoint());
Assert.assertEquals("checkpoint " + o2p1, cp4, o2p1.getRecoveryCheckpoint());
o1p1.checkpoints.add(1, cp4);
Assert.assertEquals(o1p1.checkpoints, getCheckpoints(3L, 4L, 5L));
- dnm.updateRecoveryCheckpoints(o1p1, new UpdateCheckpointsContext(clock));
+ dnm.updateRecoveryCheckpoints(o1p1, new UpdateCheckpointsContext(clock), false);
Assert.assertEquals("checkpoint " + o1p1, cp4, o1p1.getRecoveryCheckpoint());
Assert.assertEquals(o1p1.checkpoints, getCheckpoints(4L, 5L));
@@ -315,7 +315,7 @@ public class CheckpointTest
o4p1.checkpoints.add(leafCheckpoint);
UpdateCheckpointsContext ctx;
- dnm.updateRecoveryCheckpoints(o1p1, ctx = new UpdateCheckpointsContext(clock, true, Collections.<OperatorMeta, Set<OperatorMeta>>emptyMap()));
+ dnm.updateRecoveryCheckpoints(o1p1, ctx = new UpdateCheckpointsContext(clock, true, Collections.<OperatorMeta, Set<OperatorMeta>>emptyMap()), false);
Assert.assertEquals("initial checkpoint " + o1p1, Checkpoint.INITIAL_CHECKPOINT, o1p1.getRecoveryCheckpoint());
Assert.assertEquals("initial checkpoint " + o2SLp1, leafCheckpoint, o2SLp1.getRecoveryCheckpoint());
Assert.assertEquals("initial checkpoint " + o3SLp1, new Checkpoint(clock.getTime(), 0, 0), o3SLp1.getRecoveryCheckpoint());
@@ -376,7 +376,7 @@ public class CheckpointTest
PTOperator o2p1 = partitions.get(0);
UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock);
- dnm.updateRecoveryCheckpoints(o1p1, ctx);
+ dnm.updateRecoveryCheckpoints(o1p1, ctx, false);
Assert.assertTrue("no blocked operators", ctx.blocked.isEmpty());
o1p1.stats.statsRevs.checkout();
@@ -387,25 +387,25 @@ public class CheckpointTest
clock.time = o1p1.stats.windowProcessingTimeoutMillis + 1;
ctx = new UpdateCheckpointsContext(clock);
- dnm.updateRecoveryCheckpoints(o1p1, ctx);
+ dnm.updateRecoveryCheckpoints(o1p1, ctx, false);
Assert.assertEquals("o2 blocked", Sets.newHashSet(o2p1), ctx.blocked);
// assign future activation window (state-less or at-most-once).
Checkpoint cp2 = o2p1.getRecoveryCheckpoint();
o2p1.setRecoveryCheckpoint(new Checkpoint(o1p1.getRecoveryCheckpoint().windowId + 1, cp2.applicationWindowCount, cp2.checkpointWindowCount));
ctx = new UpdateCheckpointsContext(clock);
- dnm.updateRecoveryCheckpoints(o1p1, ctx);
+ dnm.updateRecoveryCheckpoints(o1p1, ctx, false);
Assert.assertEquals("no operators blocked (o2 activation window ahead)", Sets.newHashSet(), ctx.blocked);
// reset to blocked
o2p1.setRecoveryCheckpoint(cp2);
ctx = new UpdateCheckpointsContext(clock);
- dnm.updateRecoveryCheckpoints(o1p1, ctx);
+ dnm.updateRecoveryCheckpoints(o1p1, ctx, false);
Assert.assertEquals("o2 blocked", Sets.newHashSet(o2p1), ctx.blocked);
clock.time++;
ctx = new UpdateCheckpointsContext(clock);
- dnm.updateRecoveryCheckpoints(o1p1, ctx);
+ dnm.updateRecoveryCheckpoints(o1p1, ctx, false);
Assert.assertEquals("operators blocked", Sets.newHashSet(o1p1, o2p1), ctx.blocked);
o2p1.stats.statsRevs.checkout();
@@ -413,16 +413,13 @@ public class CheckpointTest
o2p1.stats.statsRevs.commit();
ctx = new UpdateCheckpointsContext(clock);
- dnm.updateRecoveryCheckpoints(o1p1, ctx);
+ dnm.updateRecoveryCheckpoints(o1p1, ctx, false);
Assert.assertEquals("operators blocked", Sets.newHashSet(o1p1), ctx.blocked);
clock.time--;
ctx = new UpdateCheckpointsContext(clock);
- dnm.updateRecoveryCheckpoints(o1p1, ctx);
+ dnm.updateRecoveryCheckpoints(o1p1, ctx, false);
Assert.assertEquals("operators blocked", Sets.newHashSet(), ctx.blocked);
-
-
-
}
@Test
@@ -473,13 +470,13 @@ public class CheckpointTest
mc1.sendHeartbeat();
Assert.assertEquals(PTOperator.State.ACTIVE, o1p1.getState());
Assert.assertEquals(10, o1p1.stats.lastWindowIdChangeTms);
- scm.monitorHeartbeat();
+ scm.monitorHeartbeat(false);
Assert.assertTrue(scm.containerStopRequests.isEmpty());
clock.time++;
mc1.sendHeartbeat();
Assert.assertEquals(PTOperator.State.ACTIVE, o1p1.getState());
- scm.monitorHeartbeat();
+ scm.monitorHeartbeat(false);
Assert.assertTrue(scm.containerStopRequests.containsKey(o1p1.getContainer().getExternalId()));
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/3b660c9c/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java b/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
index 5bea0b3..56641f8 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
@@ -280,7 +280,7 @@ public class StramLocalClusterTest
c2.waitForHeartbeat(5000);
// purge checkpoints
- localCluster.dnmgr.monitorHeartbeat(); // checkpoint purging
+ localCluster.dnmgr.monitorHeartbeat(false); // checkpoint purging
Assert.assertEquals("checkpoints " + ptNode1, Arrays.asList(new Checkpoint[] {new Checkpoint(3L, 0, 0)}), ptNode1.checkpoints);
Assert.assertEquals("checkpoints " + ptNode2, Arrays.asList(new Checkpoint[] {new Checkpoint(3L, 0, 0)}), ptNode2.checkpoints);
http://git-wip-us.apache.org/repos/asf/apex-core/blob/3b660c9c/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
index e8ec26c..3c25096 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
@@ -241,7 +241,7 @@ public class StramRecoveryTest
csr.setSinkOperatorPortName("inport1");
FutureTask<?> lpmf = scm.logicalPlanModification(Lists.newArrayList(cor, csr));
while (!lpmf.isDone()) {
- scm.monitorHeartbeat();
+ scm.monitorHeartbeat(false);
}
Assert.assertNull(lpmf.get()); // unmask exception, if any
http://git-wip-us.apache.org/repos/asf/apex-core/blob/3b660c9c/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
index 2d86618..84622c4 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
@@ -769,9 +769,9 @@ public class StreamingContainerManagerTest
o1p1mos.currentWindowId(2).deployState(DeployState.SHUTDOWN);
mc1.sendHeartbeat();
- scm.monitorHeartbeat();
+ scm.monitorHeartbeat(false);
Assert.assertEquals("committedWindowId", -1, scm.getCommittedWindowId());
- scm.monitorHeartbeat(); // committedWindowId updated in next cycle
+ scm.monitorHeartbeat(false); // committedWindowId updated in next cycle
Assert.assertEquals("committedWindowId", 1, scm.getCommittedWindowId());
scm.processEvents();
Assert.assertEquals("containers at committedWindowId=1", 4, physicalPlan.getContainers().size());
@@ -779,7 +779,7 @@ public class StreamingContainerManagerTest
// checkpoint window 2
o1p1mos.checkpointWindowId(2);
mc1.sendHeartbeat();
- scm.monitorHeartbeat();
+ scm.monitorHeartbeat(false);
Assert.assertEquals("committedWindowId", 1, scm.getCommittedWindowId());
@@ -791,7 +791,7 @@ public class StreamingContainerManagerTest
mc2.sendHeartbeat();
mc3.sendHeartbeat();
mc4.sendHeartbeat();
- scm.monitorHeartbeat();
+ scm.monitorHeartbeat(false);
// Operators are shutdown when both operators reach window Id 2
Assert.assertEquals(0, o1p1.getContainer().getOperators().size());
http://git-wip-us.apache.org/repos/asf/apex-core/blob/3b660c9c/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
index 821f4ea..285aba3 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
@@ -456,7 +456,7 @@ public class DelayOperatorTest
}
UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock, false, groups);
- scm.updateRecoveryCheckpoints(opB1, ctx);
+ scm.updateRecoveryCheckpoints(opB1, ctx, false);
Assert.assertEquals("checkpoint " + opA1, Checkpoint.INITIAL_CHECKPOINT, opA1.getRecoveryCheckpoint());
Assert.assertEquals("checkpoint " + opB1, cp3, opC1.getRecoveryCheckpoint());