You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/29 08:22:05 UTC

[46/50] incubator-apex-core git commit: APEXCORE-306 Skip recovery checkpoint upgrade for entire group during deploy.

APEXCORE-306 Skip recovery checkpoint upgrade for entire group during deploy.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/5371bc7b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/5371bc7b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/5371bc7b

Branch: refs/heads/master
Commit: 5371bc7b7cf33e5d5c96775e241af002fc22d5ed
Parents: b3402be
Author: Thomas Weise <th...@datatorrent.com>
Authored: Sat Jan 23 00:34:42 2016 -0800
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Sat Jan 23 00:34:42 2016 -0800

----------------------------------------------------------------------
 .../java/com/datatorrent/stram/StreamingContainerManager.java    | 4 +++-
 .../com/datatorrent/stram/plan/logical/DelayOperatorTest.java    | 1 -
 2 files changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/5371bc7b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index a687a37..df3bfc4 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -1939,6 +1939,7 @@ public class StreamingContainerManager implements PlanContext
       commonCheckpoints.addAll(operator.checkpoints);
     }
     Set<PTOperator> groupOpers = new HashSet<>(checkpointGroup.size());
+    boolean pendingDeploy = operator.getState() == PTOperator.State.PENDING_DEPLOY;
     if (checkpointGroup.size() > 1) {
       for (OperatorMeta om : checkpointGroup) {
         Collection<PTOperator> operators = plan.getAllOperators(om);
@@ -1949,6 +1950,7 @@ public class StreamingContainerManager implements PlanContext
           // visit all downstream operators of the group
           ctx.visited.add(groupOper);
           groupOpers.add(groupOper);
+          pendingDeploy |= operator.getState() == PTOperator.State.PENDING_DEPLOY;
         }
       }
       // highest common checkpoint
@@ -2004,7 +2006,7 @@ public class StreamingContainerManager implements PlanContext
 
     for (PTOperator groupOper : groupOpers) {
       // checkpoint frozen during deployment
-      if (ctx.recovery || groupOper.getState() != PTOperator.State.PENDING_DEPLOY) {
+      if (!pendingDeploy || ctx.recovery) {
         // remove previous checkpoints
         Checkpoint c1 = Checkpoint.INITIAL_CHECKPOINT;
         LinkedList<Checkpoint> checkpoints = groupOper.checkpoints;

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/5371bc7b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
index 06f184f..cb4222a 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
@@ -321,7 +321,6 @@ public class DelayOperatorTest
         FibonacciOperator.results.subList(0, 10).toArray());
   }
 
-  @Ignore // Out of sequence BEGIN_WINDOW tuple on Travis. Will tackle in the next version
   @Test
   public void testFibonacciRecovery1() throws Exception
   {