You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/10 00:13:11 UTC
[40/50] incubator-apex-core git commit: Fix test failures due to
reuse of previous checkpoints.
Fix test failures due to reuse of previous checkpoints.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/d19fa66e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/d19fa66e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/d19fa66e
Branch: refs/heads/master
Commit: d19fa66edd31e8b8cb481415fcda86bb2c32f6fc
Parents: 76faf86
Author: thomas <th...@datatorrent.com>
Authored: Thu Aug 20 18:59:01 2015 -0700
Committer: thomas <th...@datatorrent.com>
Committed: Thu Aug 20 18:59:01 2015 -0700
----------------------------------------------------------------------
.../common/util/AsyncFSStorageAgent.java | 1 +
.../java/com/datatorrent/stram/StramClient.java | 1 -
.../stram/StreamingContainerManagerTest.java | 2 ++
.../datatorrent/stram/engine/AtMostOnceTest.java | 2 +-
.../stram/engine/ProcessingModeTests.java | 16 +++++++---------
.../stram/engine/RecoverableInputOperator.java | 10 +++++-----
6 files changed, 16 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d19fa66e/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
index d5de61c..2ab6771 100644
--- a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
+++ b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
@@ -36,6 +36,7 @@ public class AsyncFSStorageAgent extends FSStorageAgent
private boolean syncCheckpoint = false;
+ @SuppressWarnings("unused")
private AsyncFSStorageAgent()
{
super();
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d19fa66e/engine/src/main/java/com/datatorrent/stram/StramClient.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java
index 8a8baf3..db36ef6 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java
@@ -54,7 +54,6 @@ import org.apache.log4j.DTLoggerFactory;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.BasicContainerOptConfigurator;
-import com.datatorrent.common.util.FSStorageAgent;
import com.datatorrent.stram.client.StramClientUtils;
import com.datatorrent.stram.client.StramClientUtils.ClientRMHelper;
import com.datatorrent.stram.engine.StreamingContainer;
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d19fa66e/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
index 89f2878..bd9699c 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
@@ -727,6 +727,8 @@ public class StreamingContainerManagerTest
// deploy all containers
for (Map.Entry<PTContainer, MockContainer> ce : mockContainers.entrySet()) {
ce.getValue().deploy();
+ }
+ for (Map.Entry<PTContainer, MockContainer> ce : mockContainers.entrySet()) {
// skip buffer server purge in monitorHeartbeat
ce.getKey().bufferServerAddress = null;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d19fa66e/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java b/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
index 41e0bd9..1205f30 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
@@ -51,7 +51,7 @@ public class AtMostOnceTest extends ProcessingModeTests
Assert.assertTrue("No Duplicates", CollectorOperator.duplicates.isEmpty());
}
- //@Test
+ @Test
@Override
public void testLinearOperatorRecovery() throws Exception
{
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d19fa66e/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
index 0393394..92c057d 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
@@ -18,16 +18,17 @@ package com.datatorrent.stram.engine;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.BaseOperator;
-import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicBoolean;
+
import static java.lang.Thread.sleep;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Rule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,11 +36,11 @@ import com.datatorrent.api.*;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.Operator.ProcessingMode;
-
import com.datatorrent.bufferserver.packet.MessageType;
import com.datatorrent.bufferserver.util.Codec;
import com.datatorrent.stram.StramLocalCluster;
import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.support.StramTestSupport.TestMeta;
import com.datatorrent.stram.tuple.EndWindowTuple;
import com.datatorrent.stram.tuple.Tuple;
@@ -48,6 +49,7 @@ import com.datatorrent.stram.tuple.Tuple;
*/
public class ProcessingModeTests
{
+ @Rule public TestMeta testMeta = new TestMeta();
ProcessingMode processingMode;
int maxTuples = 30;
@@ -78,8 +80,7 @@ public class ProcessingModeTests
dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
- String workingDir = new File("target/testLinearInputOperatorRecovery").getAbsolutePath();
- dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
RecoverableInputOperator rip = dag.addOperator("LongGenerator", RecoverableInputOperator.class);
rip.setMaximumTuples(maxTuples);
rip.setSimulateFailure(true);
@@ -102,8 +103,7 @@ public class ProcessingModeTests
CollectorOperator.duplicates.clear();
LogicalPlan dag = new LogicalPlan();
- String workingDir = new File("target/testLinearOperatorRecovery").getAbsolutePath();
- dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
@@ -128,8 +128,7 @@ public class ProcessingModeTests
CollectorOperator.duplicates.clear();
LogicalPlan dag = new LogicalPlan();
- String workingDir = new File("target/testLinearInlineOperatorsRecovery").getAbsolutePath();
- dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
@@ -203,7 +202,6 @@ public class ProcessingModeTests
}
}
- private static final long serialVersionUID = 201404161447L;
}
public static class MultiInputOperator implements Operator
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d19fa66e/engine/src/test/java/com/datatorrent/stram/engine/RecoverableInputOperator.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/RecoverableInputOperator.java b/engine/src/test/java/com/datatorrent/stram/engine/RecoverableInputOperator.java
index 510fbd5..4cf8274 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/RecoverableInputOperator.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/RecoverableInputOperator.java
@@ -27,7 +27,6 @@ import org.slf4j.LoggerFactory;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
-import com.datatorrent.api.Operator;
import com.datatorrent.bufferserver.util.Codec;
@@ -37,7 +36,7 @@ import com.datatorrent.bufferserver.util.Codec;
public class RecoverableInputOperator implements InputOperator, com.datatorrent.api.Operator.CheckpointListener
{
public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<Long>();
- long checkpointedWindowId;
+ private long checkpointedWindowId;
boolean firstRun = true;
transient boolean first;
transient long windowId;
@@ -92,7 +91,8 @@ public class RecoverableInputOperator implements InputOperator, com.datatorrent.
@Override
public void setup(OperatorContext context)
{
- firstRun &= checkpointedWindowId == 0;
+ firstRun = (checkpointedWindowId == 0);
+ logger.debug("firstRun={} checkpointedWindowId={}", firstRun, Codec.getStringWindowId(checkpointedWindowId));
}
@Override
@@ -105,6 +105,7 @@ public class RecoverableInputOperator implements InputOperator, com.datatorrent.
{
if (checkpointedWindowId == 0) {
checkpointedWindowId = windowId;
+ logger.debug("firstRun={} checkpointedWindowId={}", firstRun, Codec.getStringWindowId(checkpointedWindowId));
}
logger.debug("{} checkpointed at {}", this, Codec.getStringWindowId(windowId));
@@ -113,8 +114,7 @@ public class RecoverableInputOperator implements InputOperator, com.datatorrent.
@Override
public void committed(long windowId)
{
- logger.debug("{} committed at {}", this, Codec.getStringWindowId(windowId));
-
+ logger.debug("{} committed at {} firstRun {}, checkpointedWindowId {}", this, Codec.getStringWindowId(windowId), firstRun, Codec.getStringWindowId(checkpointedWindowId));
if (simulateFailure && firstRun && checkpointedWindowId > 0 && windowId > checkpointedWindowId) {
throw new RuntimeException("Failure Simulation from " + this);
}