You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/10 00:13:11 UTC

[40/50] incubator-apex-core git commit: Fix test failures due to reuse of previous checkpoints.

Fix test failures due to reuse of previous checkpoints.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/d19fa66e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/d19fa66e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/d19fa66e

Branch: refs/heads/master
Commit: d19fa66edd31e8b8cb481415fcda86bb2c32f6fc
Parents: 76faf86
Author: thomas <th...@datatorrent.com>
Authored: Thu Aug 20 18:59:01 2015 -0700
Committer: thomas <th...@datatorrent.com>
Committed: Thu Aug 20 18:59:01 2015 -0700

----------------------------------------------------------------------
 .../common/util/AsyncFSStorageAgent.java            |  1 +
 .../java/com/datatorrent/stram/StramClient.java     |  1 -
 .../stram/StreamingContainerManagerTest.java        |  2 ++
 .../datatorrent/stram/engine/AtMostOnceTest.java    |  2 +-
 .../stram/engine/ProcessingModeTests.java           | 16 +++++++---------
 .../stram/engine/RecoverableInputOperator.java      | 10 +++++-----
 6 files changed, 16 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d19fa66e/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
index d5de61c..2ab6771 100644
--- a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
+++ b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
@@ -36,6 +36,7 @@ public class AsyncFSStorageAgent extends FSStorageAgent
 
   private boolean syncCheckpoint = false;
 
+  @SuppressWarnings("unused")
   private AsyncFSStorageAgent()
   {
     super();

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d19fa66e/engine/src/main/java/com/datatorrent/stram/StramClient.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java
index 8a8baf3..db36ef6 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java
@@ -54,7 +54,6 @@ import org.apache.log4j.DTLoggerFactory;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.common.util.BasicContainerOptConfigurator;
-import com.datatorrent.common.util.FSStorageAgent;
 import com.datatorrent.stram.client.StramClientUtils;
 import com.datatorrent.stram.client.StramClientUtils.ClientRMHelper;
 import com.datatorrent.stram.engine.StreamingContainer;

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d19fa66e/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
index 89f2878..bd9699c 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
@@ -727,6 +727,8 @@ public class StreamingContainerManagerTest
     // deploy all containers
     for (Map.Entry<PTContainer, MockContainer> ce : mockContainers.entrySet()) {
       ce.getValue().deploy();
+    }
+    for (Map.Entry<PTContainer, MockContainer> ce : mockContainers.entrySet()) {
       // skip buffer server purge in monitorHeartbeat
       ce.getKey().bufferServerAddress = null;
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d19fa66e/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java b/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
index 41e0bd9..1205f30 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
@@ -51,7 +51,7 @@ public class AtMostOnceTest extends ProcessingModeTests
     Assert.assertTrue("No Duplicates", CollectorOperator.duplicates.isEmpty());
   }
 
-  //@Test
+  @Test
   @Override
   public void testLinearOperatorRecovery() throws Exception
   {

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d19fa66e/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
index 0393394..92c057d 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
@@ -18,16 +18,17 @@ package com.datatorrent.stram.engine;
 import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.common.util.BaseOperator;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.concurrent.atomic.AtomicBoolean;
+
 import static java.lang.Thread.sleep;
 
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,11 +36,11 @@ import com.datatorrent.api.*;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.Operator.ProcessingMode;
-
 import com.datatorrent.bufferserver.packet.MessageType;
 import com.datatorrent.bufferserver.util.Codec;
 import com.datatorrent.stram.StramLocalCluster;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.support.StramTestSupport.TestMeta;
 import com.datatorrent.stram.tuple.EndWindowTuple;
 import com.datatorrent.stram.tuple.Tuple;
 
@@ -48,6 +49,7 @@ import com.datatorrent.stram.tuple.Tuple;
  */
 public class ProcessingModeTests
 {
+  @Rule public TestMeta testMeta = new TestMeta();
   ProcessingMode processingMode;
   int maxTuples = 30;
 
@@ -78,8 +80,7 @@ public class ProcessingModeTests
     dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
     dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
     dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
-    String workingDir = new File("target/testLinearInputOperatorRecovery").getAbsolutePath();
-    dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
+    dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
     RecoverableInputOperator rip = dag.addOperator("LongGenerator", RecoverableInputOperator.class);
     rip.setMaximumTuples(maxTuples);
     rip.setSimulateFailure(true);
@@ -102,8 +103,7 @@ public class ProcessingModeTests
     CollectorOperator.duplicates.clear();
 
     LogicalPlan dag = new LogicalPlan();
-    String workingDir = new File("target/testLinearOperatorRecovery").getAbsolutePath();
-    dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
+    dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
     dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
     dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
     dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
@@ -128,8 +128,7 @@ public class ProcessingModeTests
     CollectorOperator.duplicates.clear();
 
     LogicalPlan dag = new LogicalPlan();
-    String workingDir = new File("target/testLinearInlineOperatorsRecovery").getAbsolutePath();
-    dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
+    dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
     dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
     dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
     dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
@@ -203,7 +202,6 @@ public class ProcessingModeTests
       }
     }
 
-    private static final long serialVersionUID = 201404161447L;
   }
 
   public static class MultiInputOperator implements Operator

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d19fa66e/engine/src/test/java/com/datatorrent/stram/engine/RecoverableInputOperator.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/RecoverableInputOperator.java b/engine/src/test/java/com/datatorrent/stram/engine/RecoverableInputOperator.java
index 510fbd5..4cf8274 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/RecoverableInputOperator.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/RecoverableInputOperator.java
@@ -27,7 +27,6 @@ import org.slf4j.LoggerFactory;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.InputOperator;
-import com.datatorrent.api.Operator;
 
 import com.datatorrent.bufferserver.util.Codec;
 
@@ -37,7 +36,7 @@ import com.datatorrent.bufferserver.util.Codec;
 public class RecoverableInputOperator implements InputOperator, com.datatorrent.api.Operator.CheckpointListener
 {
   public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<Long>();
-  long checkpointedWindowId;
+  private long checkpointedWindowId;
   boolean firstRun = true;
   transient boolean first;
   transient long windowId;
@@ -92,7 +91,8 @@ public class RecoverableInputOperator implements InputOperator, com.datatorrent.
   @Override
   public void setup(OperatorContext context)
   {
-    firstRun &= checkpointedWindowId == 0;
+    firstRun = (checkpointedWindowId == 0);
+    logger.debug("firstRun={} checkpointedWindowId={}", firstRun, Codec.getStringWindowId(checkpointedWindowId));
   }
 
   @Override
@@ -105,6 +105,7 @@ public class RecoverableInputOperator implements InputOperator, com.datatorrent.
   {
     if (checkpointedWindowId == 0) {
       checkpointedWindowId = windowId;
+      logger.debug("firstRun={} checkpointedWindowId={}", firstRun, Codec.getStringWindowId(checkpointedWindowId));
     }
 
     logger.debug("{} checkpointed at {}", this, Codec.getStringWindowId(windowId));
@@ -113,8 +114,7 @@ public class RecoverableInputOperator implements InputOperator, com.datatorrent.
   @Override
   public void committed(long windowId)
   {
-    logger.debug("{} committed at {}", this, Codec.getStringWindowId(windowId));
-
+    logger.debug("{} committed at {} firstRun {}, checkpointedWindowId {}", this, Codec.getStringWindowId(windowId), firstRun, Codec.getStringWindowId(checkpointedWindowId));
     if (simulateFailure && firstRun && checkpointedWindowId > 0 && windowId > checkpointedWindowId) {
       throw new RuntimeException("Failure Simulation from " + this);
     }