You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/29 08:22:05 UTC
[46/50] incubator-apex-core git commit: APEXCORE-306 Skip recovery
checkpoint upgrade for entire group during deploy.
APEXCORE-306 Skip recovery checkpoint upgrade for entire group during deploy.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/5371bc7b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/5371bc7b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/5371bc7b
Branch: refs/heads/master
Commit: 5371bc7b7cf33e5d5c96775e241af002fc22d5ed
Parents: b3402be
Author: Thomas Weise <th...@datatorrent.com>
Authored: Sat Jan 23 00:34:42 2016 -0800
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Sat Jan 23 00:34:42 2016 -0800
----------------------------------------------------------------------
.../java/com/datatorrent/stram/StreamingContainerManager.java | 4 +++-
.../com/datatorrent/stram/plan/logical/DelayOperatorTest.java | 1 -
2 files changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/5371bc7b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index a687a37..df3bfc4 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -1939,6 +1939,7 @@ public class StreamingContainerManager implements PlanContext
commonCheckpoints.addAll(operator.checkpoints);
}
Set<PTOperator> groupOpers = new HashSet<>(checkpointGroup.size());
+ boolean pendingDeploy = operator.getState() == PTOperator.State.PENDING_DEPLOY;
if (checkpointGroup.size() > 1) {
for (OperatorMeta om : checkpointGroup) {
Collection<PTOperator> operators = plan.getAllOperators(om);
@@ -1949,6 +1950,7 @@ public class StreamingContainerManager implements PlanContext
// visit all downstream operators of the group
ctx.visited.add(groupOper);
groupOpers.add(groupOper);
+ pendingDeploy |= operator.getState() == PTOperator.State.PENDING_DEPLOY;
}
}
// highest common checkpoint
@@ -2004,7 +2006,7 @@ public class StreamingContainerManager implements PlanContext
for (PTOperator groupOper : groupOpers) {
// checkpoint frozen during deployment
- if (ctx.recovery || groupOper.getState() != PTOperator.State.PENDING_DEPLOY) {
+ if (!pendingDeploy || ctx.recovery) {
// remove previous checkpoints
Checkpoint c1 = Checkpoint.INITIAL_CHECKPOINT;
LinkedList<Checkpoint> checkpoints = groupOper.checkpoints;
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/5371bc7b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
index 06f184f..cb4222a 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
@@ -321,7 +321,6 @@ public class DelayOperatorTest
FibonacciOperator.results.subList(0, 10).toArray());
}
- @Ignore // Out of sequence BEGIN_WINDOW tuple on Travis. Will tackle in the next version
@Test
public void testFibonacciRecovery1() throws Exception
{