You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2015/09/04 04:42:19 UTC
incubator-apex-core git commit: APEX-88 #resolve #comment fixed the
async storage agent
Repository: incubator-apex-core
Updated Branches:
refs/heads/devel-3 bff4c5bad -> 711fd0708
APEX-88 #resolve #comment fixed the async storage agent
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/711fd070
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/711fd070
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/711fd070
Branch: refs/heads/devel-3
Commit: 711fd070876c98da91a87664884938224ac73dad
Parents: bff4c5b
Author: Gaurav <ga...@datatorrent.com>
Authored: Thu Sep 3 19:33:05 2015 -0700
Committer: Gaurav <ga...@datatorrent.com>
Committed: Thu Sep 3 19:33:05 2015 -0700
----------------------------------------------------------------------
.../common/util/AsyncFSStorageAgent.java | 34 +++++++++-----------
.../common/util/AsyncFSStorageAgentTest.java | 4 +--
.../com/datatorrent/stram/CheckpointTest.java | 2 +-
.../stram/LogicalPlanModificationTest.java | 2 +-
.../com/datatorrent/stram/PartitioningTest.java | 8 ++---
.../stram/StramLocalClusterTest.java | 4 +--
.../datatorrent/stram/StramMiniClusterTest.java | 4 +--
.../datatorrent/stram/StramRecoveryTest.java | 7 ++--
.../stram/StreamingContainerManagerTest.java | 8 ++---
.../stram/debug/TupleRecorderTest.java | 2 +-
.../stram/engine/AtLeastOnceTest.java | 6 ++--
.../stram/engine/AutoMetricTest.java | 2 +-
.../stram/engine/InputOperatorTest.java | 2 +-
.../stram/engine/ProcessingModeTests.java | 6 ++--
.../datatorrent/stram/engine/SliderTest.java | 2 +-
.../com/datatorrent/stram/engine/StatsTest.java | 4 +--
.../stram/engine/StreamingContainerTest.java | 2 +-
.../stram/engine/WindowGeneratorTest.java | 2 +-
.../stram/stream/OiOEndWindowTest.java | 2 +-
.../stram/webapp/StramWebServicesTest.java | 2 +-
20 files changed, 51 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
index b565447..b89ae59 100644
--- a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
+++ b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
@@ -27,9 +27,8 @@ import org.slf4j.LoggerFactory;
import com.datatorrent.netlet.util.DTThrowable;
public class AsyncFSStorageAgent extends FSStorageAgent
{
- private final transient FileSystem fs;
private final transient Configuration conf;
- private final String localBasePath;
+ private final transient String localBasePath;
private boolean syncCheckpoint = false;
@@ -37,32 +36,31 @@ public class AsyncFSStorageAgent extends FSStorageAgent
private AsyncFSStorageAgent()
{
super();
- fs = null;
conf = null;
localBasePath = null;
}
public AsyncFSStorageAgent(String path, Configuration conf)
{
- this(".", path, conf);
- }
-
- public AsyncFSStorageAgent(String localBasePath, String path, Configuration conf)
- {
super(path, conf);
- if (localBasePath == null) {
- this.localBasePath = "/tmp";
- }
- else {
- this.localBasePath = localBasePath;
- }
- logger.debug("Initialize storage agent with {}.", this.localBasePath);
- this.conf = conf == null ? new Configuration() : conf;
try {
- fs = FileSystem.newInstance(this.conf);
+ File tempFile = File.createTempFile("msp", "msp");
+ this.localBasePath = new File(tempFile.getParent(), "localcheckpoint").getAbsolutePath();
+ tempFile.delete();
} catch (IOException ex) {
throw new RuntimeException(ex);
}
+ logger.info("using {} as the basepath for checkpointing.", this.localBasePath);
+ this.conf = conf == null ? new Configuration() : conf;
+ }
+
+ /*
+ * Storage Agent should internally manage localBasePath. It should not take it from user
+ */
+ @Deprecated
+ public AsyncFSStorageAgent(String localBasePath, String path, Configuration conf)
+ {
+ this(path, conf);
}
@Override
@@ -122,7 +120,7 @@ public class AsyncFSStorageAgent extends FSStorageAgent
@Override
public Object readResolve() throws ObjectStreamException
{
- return new AsyncFSStorageAgent(this.localBasePath, this.path, null);
+ return new AsyncFSStorageAgent(this.path, null);
}
public boolean isSyncCheckpoint()
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java b/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java
index e7f9f66..a1504e4 100644
--- a/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java
+++ b/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java
@@ -52,7 +52,7 @@ public class AsyncFSStorageAgentTest
} catch (IOException e) {
throw new RuntimeException(e);
}
- storageAgent = new AsyncFSStorageAgent(basePath, applicationPath, null);
+ storageAgent = new AsyncFSStorageAgent(applicationPath, null);
Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
attributes.put(DAG.APPLICATION_PATH, applicationPath);
@@ -116,7 +116,7 @@ public class AsyncFSStorageAgentTest
public void testRecovery() throws IOException
{
testSave();
- testMeta.storageAgent = new AsyncFSStorageAgent(testMeta.basePath, testMeta.applicationPath, null);
+ testMeta.storageAgent = new AsyncFSStorageAgent(testMeta.applicationPath, null);
testSave();
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
index 4072894..65929fd 100644
--- a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
@@ -112,7 +112,7 @@ public class CheckpointTest
{
LogicalPlan dag = new LogicalPlan();
dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
- AsyncFSStorageAgent storageAgent = new AsyncFSStorageAgent(testMeta.dir + "/locaPath", testMeta.dir, null);
+ AsyncFSStorageAgent storageAgent = new AsyncFSStorageAgent(testMeta.dir, null);
storageAgent.setSyncCheckpoint(true);
dag.setAttribute(OperatorContext.STORAGE_AGENT, storageAgent);
dag.setAttribute(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 1);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java b/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java
index 78a1bd8..efdbd35 100644
--- a/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java
@@ -340,7 +340,7 @@ public class LogicalPlanModificationTest
@Test
public void testExecutionManagerWithAsyncStorageAgent() throws Exception
{
- testExecutionManager(new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
+ testExecutionManager(new AsyncFSStorageAgent(testMeta.dir, null));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
index 15ad76e..0b3692a 100644
--- a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
@@ -151,7 +151,7 @@ public class PartitioningTest
{
LogicalPlan dag = new LogicalPlan();
File checkpointDir = new File(TEST_OUTPUT_DIR, "testDefaultPartitioning");
- dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(checkpointDir.getPath() + "/localPath", checkpointDir.getPath(), null));
+ dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(checkpointDir.getPath(), null));
Integer[][] testData = {
{4, 5}
@@ -252,7 +252,7 @@ public class PartitioningTest
LogicalPlan dag = new LogicalPlan();
dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 5);
File checkpointDir = new File(TEST_OUTPUT_DIR, "testDynamicDefaultPartitioning");
- dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(checkpointDir.getPath() + "/localPath", checkpointDir.getPath(), null));
+ dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(checkpointDir.getPath(), null));
CollectorOperator.receivedTuples.clear();
@@ -401,7 +401,7 @@ public class PartitioningTest
{
File checkpointDir = new File(TEST_OUTPUT_DIR, "testInputOperatorPartitioning");
dag.getAttributes().put(LogicalPlan.APPLICATION_PATH, checkpointDir.getPath());
- dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(checkpointDir.getPath() + "/localPath", checkpointDir.getPath(), null));
+ dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(checkpointDir.getPath(), null));
PartitionableInputOperator input = dag.addOperator("input", new PartitionableInputOperator());
dag.setAttribute(input, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitionLoadWatch()}));
@@ -423,7 +423,7 @@ public class PartitioningTest
Checkpoint checkpoint = new Checkpoint(10L, 0, 0);
p.checkpoints.add(checkpoint);
p.setRecoveryCheckpoint(checkpoint);
- AsyncFSStorageAgent agent = new AsyncFSStorageAgent(checkpointDir.getPath() + "/localPath", checkpointDir.getPath(), null);
+ AsyncFSStorageAgent agent = new AsyncFSStorageAgent(checkpointDir.getPath(), null);
agent.save(inputDeployed, p.getId(), 10L);
agent.copyToHDFS(p.getId(), 10l);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java b/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
index 1881566..6e9eb48 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
@@ -69,7 +69,7 @@ public class StramLocalClusterTest
{
LogicalPlan dag = new LogicalPlan();
dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
- dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
+ dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null));
TestGeneratorInputOperator genNode = dag.addOperator("genNode", TestGeneratorInputOperator.class);
genNode.setMaxTuples(2);
@@ -109,7 +109,7 @@ public class StramLocalClusterTest
{
LogicalPlan dag = new LogicalPlan();
dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
- AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null);
+ AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.dir, null);
agent.setSyncCheckpoint(true);
dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, agent);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java b/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
index 99478f5..a377a72 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
@@ -203,7 +203,7 @@ public class StramMiniClusterTest
LogicalPlanConfiguration tb = new LogicalPlanConfiguration(conf);
tb.addFromProperties(dagProps, null);
LogicalPlan dag = createDAG(tb);
- AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null);
+ AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.dir, null);
agent.setSyncCheckpoint(true);
dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
Configuration yarnConf = new Configuration(yarnCluster.getConfig());
@@ -362,7 +362,7 @@ public class StramMiniClusterTest
LogicalPlan dag = new LogicalPlan();
dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
- AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null);
+ AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.dir, null);
agent.setSyncCheckpoint(true);
dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
FailingOperator badOperator = dag.addOperator("badOperator", FailingOperator.class);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
index ab2092a..ebce32a 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
@@ -130,7 +130,7 @@ public class StramRecoveryTest
@Test
public void testPhysicalPlanSerializationWithAsyncAgent() throws Exception
{
- testPhysicalPlanSerialization(new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
+ testPhysicalPlanSerialization(new AsyncFSStorageAgent(testMeta.dir, null));
}
public static class StatsListeningOperator extends TestGeneratorInputOperator implements StatsListener
@@ -273,7 +273,7 @@ public class StramRecoveryTest
@Test
public void testContainerManagerWithAsyncAgent() throws Exception
{
- testPhysicalPlanSerialization(new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
+ testPhysicalPlanSerialization(new AsyncFSStorageAgent(testMeta.dir, null));
}
@Test
@@ -450,8 +450,7 @@ public class StramRecoveryTest
public void testRestartAppWithAsyncAgent() throws Exception
{
String appPath1 = testMeta.dir + "/app1";
- String checkpointPath = testMeta.dir + "/localPath";
- testRestartApp(new AsyncFSStorageAgent(checkpointPath, appPath1 + "/" + LogicalPlan.SUBDIR_CHECKPOINTS, null), appPath1);
+ testRestartApp(new AsyncFSStorageAgent(appPath1 + "/" + LogicalPlan.SUBDIR_CHECKPOINTS, null), appPath1);
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
index bd9699c..2656e8d 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
@@ -476,7 +476,7 @@ public class StreamingContainerManagerTest
FileUtils.deleteDirectory(path.getAbsoluteFile());
FileUtils.forceMkdir(new File(path.getAbsoluteFile(), "/localPath"));
- AsyncFSStorageAgent sa = new AsyncFSStorageAgent(path.getPath() + "/localPath", path.getPath(), null);
+ AsyncFSStorageAgent sa = new AsyncFSStorageAgent(path.getPath(), null);
long[] windowIds = new long[]{123L, 345L, 234L};
for (long windowId : windowIds) {
@@ -813,7 +813,7 @@ public class StreamingContainerManagerTest
public void testPhysicalPropertyUpdate() throws Exception
{
LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
+ dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null));
TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
dag.addStream("o1.outport", o1.outport, o2.inport1);
@@ -857,7 +857,7 @@ public class StreamingContainerManagerTest
private void testAppDataSources(LogicalPlan dag, boolean appendQIDToTopic) throws Exception
{
- dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
+ dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null));
StramLocalCluster lc = new StramLocalCluster(dag);
lc.runAsync();
StreamingContainerManager dnmgr = lc.dnmgr;
@@ -931,7 +931,7 @@ public class StreamingContainerManagerTest
try {
server.start();
LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
+ dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null));
TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
dag.addStream("o1.outport", o1.outport, o2.inport1);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java b/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
index 718bf1b..b7647a5 100644
--- a/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
@@ -212,7 +212,7 @@ public class TupleRecorderTest
public void testRecordingFlow() throws Exception
{
LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testWorkDir.getAbsolutePath() + "/localPath", testWorkDir.getAbsolutePath(), null));
+ dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testWorkDir.getAbsolutePath(), null));
dag.getAttributes().put(LogicalPlan.APPLICATION_PATH, "file://" + testWorkDir.getAbsolutePath());
dag.getAttributes().put(LogicalPlan.TUPLE_RECORDING_PART_FILE_SIZE, 1024); // 1KB per part
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/engine/AtLeastOnceTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/AtLeastOnceTest.java b/engine/src/test/java/com/datatorrent/stram/engine/AtLeastOnceTest.java
index f32be13..5108e03 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/AtLeastOnceTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/AtLeastOnceTest.java
@@ -61,7 +61,7 @@ public class AtLeastOnceTest
int maxTuples = 30;
LogicalPlan dag = new LogicalPlan();
String workingDir = new File("target/testInputOperatorRecovery").getAbsolutePath();
- AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null);
+ AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(workingDir, null);
asyncFSStorageAgent.setSyncCheckpoint(true);
dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, asyncFSStorageAgent);
dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
@@ -88,7 +88,7 @@ public class AtLeastOnceTest
int maxTuples = 30;
LogicalPlan dag = new LogicalPlan();
String workingDir = new File("target/testOperatorRecovery").getAbsolutePath();
- AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null);
+ AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(workingDir, null);
asyncFSStorageAgent.setSyncCheckpoint(true);
dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, asyncFSStorageAgent);
dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
@@ -116,7 +116,7 @@ public class AtLeastOnceTest
int maxTuples = 30;
LogicalPlan dag = new LogicalPlan();
String workingDir = new File("target/testOperatorRecovery").getAbsolutePath();
- AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null);
+ AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(workingDir, null);
asyncFSStorageAgent.setSyncCheckpoint(true);
dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, asyncFSStorageAgent);
//dag.getAttributes().get(DAG.HEARTBEAT_INTERVAL_MILLIS, 400);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java b/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java
index 3ca5221..e0bfc37 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java
@@ -182,7 +182,7 @@ public class AutoMetricTest
public void testMetricPropagation() throws Exception
{
LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null));
dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java b/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java
index 142f45f..6976dee 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java
@@ -126,7 +126,7 @@ public class InputOperatorTest
{
LogicalPlan dag = new LogicalPlan();
String testWorkDir = new File("target").getAbsolutePath();
- dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testWorkDir + "/localBasePath", testWorkDir, null));
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testWorkDir, null));
EvenOddIntegerGeneratorInputOperator generator = dag.addOperator("NumberGenerator", EvenOddIntegerGeneratorInputOperator.class);
final CollectorModule<Number> collector = dag.addOperator("NumberCollector", new CollectorModule<Number>());
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
index 92c057d..28e75fa 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
@@ -80,7 +80,7 @@ public class ProcessingModeTests
dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
- dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null));
RecoverableInputOperator rip = dag.addOperator("LongGenerator", RecoverableInputOperator.class);
rip.setMaximumTuples(maxTuples);
rip.setSimulateFailure(true);
@@ -103,7 +103,7 @@ public class ProcessingModeTests
CollectorOperator.duplicates.clear();
LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null));
dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
@@ -128,7 +128,7 @@ public class ProcessingModeTests
CollectorOperator.duplicates.clear();
LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null));
dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java b/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java
index 26515d4..d16cf19 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java
@@ -137,7 +137,7 @@ public class SliderTest
{
LogicalPlan dag = new LogicalPlan();
String workingDir = new File("target/sliderTest").getAbsolutePath();
- dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
+ dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir, null));
dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 100);
Input input = dag.addOperator("Input", new Input());
Sum sum = dag.addOperator("Sum", new Sum());
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java b/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java
index 0ededd4..aa32bdc 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java
@@ -174,7 +174,7 @@ public class StatsTest
int tupleCount = 10;
LogicalPlan dag = new LogicalPlan();
String workingDir = new File("target").getAbsolutePath();
- dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir, null));
TestOperator testOper = dag.addOperator("TestOperator", TestOperator.class);
TestInputStatsListener testInputStatsListener = new TestInputStatsListener();
dag.setAttribute(testOper, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{testInputStatsListener}));
@@ -230,7 +230,7 @@ public class StatsTest
{
LogicalPlan dag = new LogicalPlan();
String workingDir = new File("target/baseTestForQueueSize").getAbsolutePath();
- dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
+ dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir, null));
dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 200);
TestOperator testOper = dag.addOperator("TestOperator", TestOperator.class);
testOper.setMaxTuples(maxTuples);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java b/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java
index 7d37429..70c896c 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java
@@ -47,7 +47,7 @@ public class StreamingContainerTest
{
LogicalPlan lp = new LogicalPlan();
String workingDir = new File("target/testCommitted").getAbsolutePath();
- lp.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
+ lp.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir, null));
lp.setAttribute(DAGContext.CHECKPOINT_WINDOW_COUNT, 1);
CommitAwareOperator operator = lp.addOperator("CommitAwareOperator", new CommitAwareOperator());
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
index 4f7b842..4665d79 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
@@ -306,7 +306,7 @@ public class WindowGeneratorTest
logger.info("Testing Out of Sequence Error");
LogicalPlan dag = new LogicalPlan();
String workingDir = new File("target/testOutofSequenceError").getAbsolutePath();
- dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
+ dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir, null));
RandomNumberGenerator rng = dag.addOperator("random", new RandomNumberGenerator());
MyLogger ml = dag.addOperator("logger", new MyLogger());
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java b/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java
index a4e9c43..365dd03 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java
@@ -97,7 +97,7 @@ public class OiOEndWindowTest
{
LogicalPlan lp = new LogicalPlan();
String workingDir = new File("target/validateOiOImplementation").getAbsolutePath();
- lp.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
+ lp.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir, null));
TestInputOperator io = lp.addOperator("Input Operator", new TestInputOperator());
FirstGenericOperator go = lp.addOperator("First Generic Operator", new FirstGenericOperator());
SecondGenericOperator out = lp.addOperator("Second Generic Operator", new SecondGenericOperator());
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java b/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java
index 9b8f0b2..b0680b8 100644
--- a/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java
@@ -129,7 +129,7 @@ public class StramWebServicesTest extends JerseyTest
LogicalPlan dag = new LogicalPlan();
String workingDir = new File("target", StramWebServicesTest.class.getName()).getAbsolutePath();
dag.setAttribute(LogicalPlan.APPLICATION_PATH, workingDir);
- dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
+ dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir, null));
final DummyStreamingContainerManager streamingContainerManager = new DummyStreamingContainerManager(dag);
appContext = new TestAppContext();