You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2015/09/04 04:42:19 UTC

incubator-apex-core git commit: APEX-88 #resolve #comment fixed the async storage agent

Repository: incubator-apex-core
Updated Branches:
  refs/heads/devel-3 bff4c5bad -> 711fd0708


APEX-88 #resolve #comment fixed the async storage agent


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/711fd070
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/711fd070
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/711fd070

Branch: refs/heads/devel-3
Commit: 711fd070876c98da91a87664884938224ac73dad
Parents: bff4c5b
Author: Gaurav <ga...@datatorrent.com>
Authored: Thu Sep 3 19:33:05 2015 -0700
Committer: Gaurav <ga...@datatorrent.com>
Committed: Thu Sep 3 19:33:05 2015 -0700

----------------------------------------------------------------------
 .../common/util/AsyncFSStorageAgent.java        | 34 +++++++++-----------
 .../common/util/AsyncFSStorageAgentTest.java    |  4 +--
 .../com/datatorrent/stram/CheckpointTest.java   |  2 +-
 .../stram/LogicalPlanModificationTest.java      |  2 +-
 .../com/datatorrent/stram/PartitioningTest.java |  8 ++---
 .../stram/StramLocalClusterTest.java            |  4 +--
 .../datatorrent/stram/StramMiniClusterTest.java |  4 +--
 .../datatorrent/stram/StramRecoveryTest.java    |  7 ++--
 .../stram/StreamingContainerManagerTest.java    |  8 ++---
 .../stram/debug/TupleRecorderTest.java          |  2 +-
 .../stram/engine/AtLeastOnceTest.java           |  6 ++--
 .../stram/engine/AutoMetricTest.java            |  2 +-
 .../stram/engine/InputOperatorTest.java         |  2 +-
 .../stram/engine/ProcessingModeTests.java       |  6 ++--
 .../datatorrent/stram/engine/SliderTest.java    |  2 +-
 .../com/datatorrent/stram/engine/StatsTest.java |  4 +--
 .../stram/engine/StreamingContainerTest.java    |  2 +-
 .../stram/engine/WindowGeneratorTest.java       |  2 +-
 .../stram/stream/OiOEndWindowTest.java          |  2 +-
 .../stram/webapp/StramWebServicesTest.java      |  2 +-
 20 files changed, 51 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
index b565447..b89ae59 100644
--- a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
+++ b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
@@ -27,9 +27,8 @@ import org.slf4j.LoggerFactory;
 import com.datatorrent.netlet.util.DTThrowable;
 public class AsyncFSStorageAgent extends FSStorageAgent
 {
-  private final transient FileSystem fs;
   private final transient Configuration conf;
-  private final String localBasePath;
+  private final transient String localBasePath;
 
   private boolean syncCheckpoint = false;
 
@@ -37,32 +36,31 @@ public class AsyncFSStorageAgent extends FSStorageAgent
   private AsyncFSStorageAgent()
   {
     super();
-    fs = null;
     conf = null;
     localBasePath = null;
   }
 
   public AsyncFSStorageAgent(String path, Configuration conf)
   {
-    this(".", path, conf);
-  }
-
-  public AsyncFSStorageAgent(String localBasePath, String path, Configuration conf)
-  {
     super(path, conf);
-    if (localBasePath == null) {
-      this.localBasePath = "/tmp";
-    }
-    else {
-      this.localBasePath = localBasePath;
-    }
-    logger.debug("Initialize storage agent with {}.", this.localBasePath);
-    this.conf = conf == null ? new Configuration() : conf;
     try {
-      fs = FileSystem.newInstance(this.conf);
+      File tempFile = File.createTempFile("msp", "msp");
+      this.localBasePath = new File(tempFile.getParent(), "localcheckpoint").getAbsolutePath();
+      tempFile.delete();
     } catch (IOException ex) {
       throw new RuntimeException(ex);
     }
+    logger.info("using {} as the basepath for checkpointing.", this.localBasePath);
+    this.conf = conf == null ? new Configuration() : conf;
+  }
+
+  /*
+   * Storage Agent should internally manage localBasePath. It should not take it from user
+   */
+  @Deprecated
+  public AsyncFSStorageAgent(String localBasePath, String path, Configuration conf)
+  {
+    this(path, conf);
   }
 
   @Override
@@ -122,7 +120,7 @@ public class AsyncFSStorageAgent extends FSStorageAgent
   @Override
   public Object readResolve() throws ObjectStreamException
   {
-    return new AsyncFSStorageAgent(this.localBasePath, this.path, null);
+    return new AsyncFSStorageAgent(this.path, null);
   }
 
   public boolean isSyncCheckpoint()

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java b/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java
index e7f9f66..a1504e4 100644
--- a/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java
+++ b/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java
@@ -52,7 +52,7 @@ public class AsyncFSStorageAgentTest
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
-      storageAgent = new AsyncFSStorageAgent(basePath, applicationPath, null);
+      storageAgent = new AsyncFSStorageAgent(applicationPath, null);
 
       Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
       attributes.put(DAG.APPLICATION_PATH, applicationPath);
@@ -116,7 +116,7 @@ public class AsyncFSStorageAgentTest
   public void testRecovery() throws IOException
   {
     testSave();
-    testMeta.storageAgent = new AsyncFSStorageAgent(testMeta.basePath, testMeta.applicationPath, null);
+    testMeta.storageAgent = new AsyncFSStorageAgent(testMeta.applicationPath, null);
     testSave();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
index 4072894..65929fd 100644
--- a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
@@ -112,7 +112,7 @@ public class CheckpointTest
   {
     LogicalPlan dag = new LogicalPlan();
     dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
-    AsyncFSStorageAgent storageAgent = new AsyncFSStorageAgent(testMeta.dir + "/locaPath", testMeta.dir, null);
+    AsyncFSStorageAgent storageAgent = new AsyncFSStorageAgent(testMeta.dir, null);
     storageAgent.setSyncCheckpoint(true);
     dag.setAttribute(OperatorContext.STORAGE_AGENT, storageAgent);
     dag.setAttribute(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 1);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java b/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java
index 78a1bd8..efdbd35 100644
--- a/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java
@@ -340,7 +340,7 @@ public class LogicalPlanModificationTest
   @Test
   public void testExecutionManagerWithAsyncStorageAgent() throws Exception
   {
-    testExecutionManager(new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
+    testExecutionManager(new AsyncFSStorageAgent(testMeta.dir, null));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
index 15ad76e..0b3692a 100644
--- a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
@@ -151,7 +151,7 @@ public class PartitioningTest
   {
     LogicalPlan dag = new LogicalPlan();
     File checkpointDir = new File(TEST_OUTPUT_DIR, "testDefaultPartitioning");
-    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(checkpointDir.getPath() + "/localPath", checkpointDir.getPath(), null));
+    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(checkpointDir.getPath(), null));
 
     Integer[][] testData = {
       {4, 5}
@@ -252,7 +252,7 @@ public class PartitioningTest
     LogicalPlan dag = new LogicalPlan();
     dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 5);
     File checkpointDir = new File(TEST_OUTPUT_DIR, "testDynamicDefaultPartitioning");
-    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(checkpointDir.getPath() + "/localPath", checkpointDir.getPath(), null));
+    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(checkpointDir.getPath(), null));
 
     CollectorOperator.receivedTuples.clear();
 
@@ -401,7 +401,7 @@ public class PartitioningTest
     {
       File checkpointDir = new File(TEST_OUTPUT_DIR, "testInputOperatorPartitioning");
       dag.getAttributes().put(LogicalPlan.APPLICATION_PATH, checkpointDir.getPath());
-      dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(checkpointDir.getPath() + "/localPath", checkpointDir.getPath(), null));
+      dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(checkpointDir.getPath(), null));
 
       PartitionableInputOperator input = dag.addOperator("input", new PartitionableInputOperator());
       dag.setAttribute(input, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitionLoadWatch()}));
@@ -423,7 +423,7 @@ public class PartitioningTest
         Checkpoint checkpoint = new Checkpoint(10L, 0, 0);
         p.checkpoints.add(checkpoint);
         p.setRecoveryCheckpoint(checkpoint);
-        AsyncFSStorageAgent agent = new AsyncFSStorageAgent(checkpointDir.getPath() + "/localPath", checkpointDir.getPath(), null);
+        AsyncFSStorageAgent agent = new AsyncFSStorageAgent(checkpointDir.getPath(), null);
         agent.save(inputDeployed, p.getId(), 10L);
         agent.copyToHDFS(p.getId(), 10l);
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java b/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
index 1881566..6e9eb48 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
@@ -69,7 +69,7 @@ public class StramLocalClusterTest
   {
     LogicalPlan dag = new LogicalPlan();
     dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
-    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
+    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null));
 
     TestGeneratorInputOperator genNode = dag.addOperator("genNode", TestGeneratorInputOperator.class);
     genNode.setMaxTuples(2);
@@ -109,7 +109,7 @@ public class StramLocalClusterTest
   {
     LogicalPlan dag = new LogicalPlan();
     dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
-    AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null);
+    AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.dir, null);
     agent.setSyncCheckpoint(true);
     dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, agent);
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java b/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
index 99478f5..a377a72 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
@@ -203,7 +203,7 @@ public class StramMiniClusterTest
     LogicalPlanConfiguration tb = new LogicalPlanConfiguration(conf);
     tb.addFromProperties(dagProps, null);
     LogicalPlan dag = createDAG(tb);
-    AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null);
+    AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.dir, null);
     agent.setSyncCheckpoint(true);
     dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
     Configuration yarnConf = new Configuration(yarnCluster.getConfig());
@@ -362,7 +362,7 @@ public class StramMiniClusterTest
 
     LogicalPlan dag = new LogicalPlan();
     dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
-    AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null);
+    AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.dir, null);
     agent.setSyncCheckpoint(true);
     dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
     FailingOperator badOperator = dag.addOperator("badOperator", FailingOperator.class);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
index ab2092a..ebce32a 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
@@ -130,7 +130,7 @@ public class StramRecoveryTest
   @Test
   public void testPhysicalPlanSerializationWithAsyncAgent() throws Exception
   {
-    testPhysicalPlanSerialization(new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
+    testPhysicalPlanSerialization(new AsyncFSStorageAgent(testMeta.dir, null));
   }
 
   public static class StatsListeningOperator extends TestGeneratorInputOperator implements StatsListener
@@ -273,7 +273,7 @@ public class StramRecoveryTest
   @Test
   public void testContainerManagerWithAsyncAgent() throws Exception
   {
-    testPhysicalPlanSerialization(new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
+    testPhysicalPlanSerialization(new AsyncFSStorageAgent(testMeta.dir, null));
   }
 
   @Test
@@ -450,8 +450,7 @@ public class StramRecoveryTest
   public void testRestartAppWithAsyncAgent() throws Exception
   {
     String appPath1 = testMeta.dir + "/app1";
-    String checkpointPath = testMeta.dir + "/localPath";
-    testRestartApp(new AsyncFSStorageAgent(checkpointPath, appPath1 + "/" + LogicalPlan.SUBDIR_CHECKPOINTS, null), appPath1);
+    testRestartApp(new AsyncFSStorageAgent(appPath1 + "/" + LogicalPlan.SUBDIR_CHECKPOINTS, null), appPath1);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
index bd9699c..2656e8d 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
@@ -476,7 +476,7 @@ public class StreamingContainerManagerTest
     FileUtils.deleteDirectory(path.getAbsoluteFile());
     FileUtils.forceMkdir(new File(path.getAbsoluteFile(), "/localPath"));
 
-    AsyncFSStorageAgent sa = new AsyncFSStorageAgent(path.getPath() + "/localPath", path.getPath(), null);
+    AsyncFSStorageAgent sa = new AsyncFSStorageAgent(path.getPath(), null);
 
     long[] windowIds = new long[]{123L, 345L, 234L};
     for (long windowId : windowIds) {
@@ -813,7 +813,7 @@ public class StreamingContainerManagerTest
   public void testPhysicalPropertyUpdate() throws Exception
   {
     LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
+    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null));
     TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
     GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
     dag.addStream("o1.outport", o1.outport, o2.inport1);
@@ -857,7 +857,7 @@ public class StreamingContainerManagerTest
 
   private void testAppDataSources(LogicalPlan dag, boolean appendQIDToTopic) throws Exception
   {
-    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
+    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null));
     StramLocalCluster lc = new StramLocalCluster(dag);
     lc.runAsync();
     StreamingContainerManager dnmgr = lc.dnmgr;
@@ -931,7 +931,7 @@ public class StreamingContainerManagerTest
     try {
       server.start();
       LogicalPlan dag = new LogicalPlan();
-      dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
+      dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null));
       TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
       GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
       dag.addStream("o1.outport", o1.outport, o2.inport1);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java b/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
index 718bf1b..b7647a5 100644
--- a/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
@@ -212,7 +212,7 @@ public class TupleRecorderTest
   public void testRecordingFlow() throws Exception
   {
     LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testWorkDir.getAbsolutePath() + "/localPath", testWorkDir.getAbsolutePath(), null));
+    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testWorkDir.getAbsolutePath(), null));
 
     dag.getAttributes().put(LogicalPlan.APPLICATION_PATH, "file://" + testWorkDir.getAbsolutePath());
     dag.getAttributes().put(LogicalPlan.TUPLE_RECORDING_PART_FILE_SIZE, 1024);  // 1KB per part

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/engine/AtLeastOnceTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/AtLeastOnceTest.java b/engine/src/test/java/com/datatorrent/stram/engine/AtLeastOnceTest.java
index f32be13..5108e03 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/AtLeastOnceTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/AtLeastOnceTest.java
@@ -61,7 +61,7 @@ public class AtLeastOnceTest
     int maxTuples = 30;
     LogicalPlan dag = new LogicalPlan();
     String workingDir = new File("target/testInputOperatorRecovery").getAbsolutePath();
-    AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null);
+    AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(workingDir, null);
     asyncFSStorageAgent.setSyncCheckpoint(true);
     dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, asyncFSStorageAgent);
     dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
@@ -88,7 +88,7 @@ public class AtLeastOnceTest
     int maxTuples = 30;
     LogicalPlan dag = new LogicalPlan();
     String workingDir = new File("target/testOperatorRecovery").getAbsolutePath();
-    AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null);
+    AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(workingDir, null);
     asyncFSStorageAgent.setSyncCheckpoint(true);
     dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, asyncFSStorageAgent);
     dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
@@ -116,7 +116,7 @@ public class AtLeastOnceTest
     int maxTuples = 30;
     LogicalPlan dag = new LogicalPlan();
     String workingDir = new File("target/testOperatorRecovery").getAbsolutePath();
-    AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null);
+    AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(workingDir, null);
     asyncFSStorageAgent.setSyncCheckpoint(true);
     dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, asyncFSStorageAgent);
     //dag.getAttributes().get(DAG.HEARTBEAT_INTERVAL_MILLIS, 400);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java b/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java
index 3ca5221..e0bfc37 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java
@@ -182,7 +182,7 @@ public class AutoMetricTest
   public void testMetricPropagation() throws Exception
   {
     LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
+    dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null));
     dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
     dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java b/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java
index 142f45f..6976dee 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java
@@ -126,7 +126,7 @@ public class InputOperatorTest
   {
     LogicalPlan dag = new LogicalPlan();
     String testWorkDir = new File("target").getAbsolutePath();
-    dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testWorkDir + "/localBasePath", testWorkDir, null));
+    dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testWorkDir, null));
     EvenOddIntegerGeneratorInputOperator generator = dag.addOperator("NumberGenerator", EvenOddIntegerGeneratorInputOperator.class);
     final CollectorModule<Number> collector = dag.addOperator("NumberCollector", new CollectorModule<Number>());
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
index 92c057d..28e75fa 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
@@ -80,7 +80,7 @@ public class ProcessingModeTests
     dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
     dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
     dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
-    dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
+    dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null));
     RecoverableInputOperator rip = dag.addOperator("LongGenerator", RecoverableInputOperator.class);
     rip.setMaximumTuples(maxTuples);
     rip.setSimulateFailure(true);
@@ -103,7 +103,7 @@ public class ProcessingModeTests
     CollectorOperator.duplicates.clear();
 
     LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
+    dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null));
     dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
     dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
     dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
@@ -128,7 +128,7 @@ public class ProcessingModeTests
     CollectorOperator.duplicates.clear();
 
     LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
+    dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null));
     dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
     dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
     dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java b/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java
index 26515d4..d16cf19 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java
@@ -137,7 +137,7 @@ public class SliderTest
   {
     LogicalPlan dag = new LogicalPlan();
     String workingDir = new File("target/sliderTest").getAbsolutePath();
-    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
+    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir, null));
     dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 100);
     Input input = dag.addOperator("Input", new Input());
     Sum sum = dag.addOperator("Sum", new Sum());

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java b/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java
index 0ededd4..aa32bdc 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java
@@ -174,7 +174,7 @@ public class StatsTest
     int tupleCount = 10;
     LogicalPlan dag = new LogicalPlan();
     String workingDir = new File("target").getAbsolutePath();
-    dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
+    dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir, null));
     TestOperator testOper = dag.addOperator("TestOperator", TestOperator.class);
     TestInputStatsListener testInputStatsListener = new TestInputStatsListener();
     dag.setAttribute(testOper, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{testInputStatsListener}));
@@ -230,7 +230,7 @@ public class StatsTest
   {
     LogicalPlan dag = new LogicalPlan();
     String workingDir = new File("target/baseTestForQueueSize").getAbsolutePath();
-    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
+    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir, null));
     dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 200);
     TestOperator testOper = dag.addOperator("TestOperator", TestOperator.class);
     testOper.setMaxTuples(maxTuples);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java b/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java
index 7d37429..70c896c 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java
@@ -47,7 +47,7 @@ public class StreamingContainerTest
   {
     LogicalPlan lp = new LogicalPlan();
     String workingDir = new File("target/testCommitted").getAbsolutePath();
-    lp.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
+    lp.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir, null));
     lp.setAttribute(DAGContext.CHECKPOINT_WINDOW_COUNT, 1);
     CommitAwareOperator operator = lp.addOperator("CommitAwareOperator", new CommitAwareOperator());
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
index 4f7b842..4665d79 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
@@ -306,7 +306,7 @@ public class WindowGeneratorTest
     logger.info("Testing Out of Sequence Error");
     LogicalPlan dag = new LogicalPlan();
     String workingDir = new File("target/testOutofSequenceError").getAbsolutePath();
-    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
+    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir, null));
     RandomNumberGenerator rng = dag.addOperator("random", new RandomNumberGenerator());
     MyLogger ml = dag.addOperator("logger", new MyLogger());
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java b/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java
index a4e9c43..365dd03 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java
@@ -97,7 +97,7 @@ public class OiOEndWindowTest
   {
     LogicalPlan lp = new LogicalPlan();
     String workingDir = new File("target/validateOiOImplementation").getAbsolutePath();
-    lp.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
+    lp.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir, null));
     TestInputOperator io = lp.addOperator("Input Operator", new TestInputOperator());
     FirstGenericOperator go = lp.addOperator("First Generic Operator", new FirstGenericOperator());
     SecondGenericOperator out = lp.addOperator("Second Generic Operator", new SecondGenericOperator());

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java b/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java
index 9b8f0b2..b0680b8 100644
--- a/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java
@@ -129,7 +129,7 @@ public class StramWebServicesTest extends JerseyTest
         LogicalPlan dag = new LogicalPlan();
         String workingDir = new File("target", StramWebServicesTest.class.getName()).getAbsolutePath();
         dag.setAttribute(LogicalPlan.APPLICATION_PATH, workingDir);
-        dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
+        dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir, null));
         final DummyStreamingContainerManager streamingContainerManager = new DummyStreamingContainerManager(dag);
 
         appContext = new TestAppContext();