You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/10 00:12:37 UTC

[06/50] incubator-apex-core git commit: writing checkpoints async

writing checkpoints async


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/29eb6c37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/29eb6c37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/29eb6c37

Branch: refs/heads/master
Commit: 29eb6c377e92242c540d9a4d8be43a1fe05b7ac2
Parents: 66a75e0
Author: Gaurav <ga...@datatorrent.com>
Authored: Thu Jul 30 11:15:24 2015 -0700
Committer: Gaurav <ga...@datatorrent.com>
Committed: Tue Aug 4 16:28:17 2015 -0700

----------------------------------------------------------------------
 .../common/util/AsyncFSStorageAgent.java        | 111 ++++++++++++++++
 .../datatorrent/common/util/FSStorageAgent.java |   3 +-
 .../common/codec/JsonStreamCodecTest.java       |  15 ++-
 .../common/util/AsyncFSStorageAgentTest.java    | 133 +++++++++++++++++++
 .../java/com/datatorrent/stram/StramClient.java |   5 +-
 .../datatorrent/stram/StramLocalCluster.java    |   4 +-
 .../stram/StreamingAppMasterService.java        |   2 +-
 .../stram/StreamingContainerManager.java        |  10 +-
 .../java/com/datatorrent/stram/engine/Node.java |  64 ++++++++-
 .../stram/plan/physical/PhysicalPlan.java       |   8 +-
 .../com/datatorrent/stram/CheckpointTest.java   |  11 +-
 .../stram/LogicalPlanModificationTest.java      |  22 ++-
 .../com/datatorrent/stram/PartitioningTest.java |  26 +++-
 .../stram/StramLocalClusterTest.java            |  22 ++-
 .../datatorrent/stram/StramMiniClusterTest.java |   9 +-
 .../datatorrent/stram/StramRecoveryTest.java    |  56 ++++++--
 .../stram/StreamingContainerManagerTest.java    |  45 ++++++-
 .../stram/debug/TupleRecorderTest.java          |   3 +
 .../stram/engine/AutoMetricTest.java            |   2 +
 .../stram/engine/InputOperatorTest.java         |   5 +-
 .../stram/engine/ProcessingModeTests.java       |   9 ++
 .../datatorrent/stram/engine/SliderTest.java    |   5 +
 .../com/datatorrent/stram/engine/StatsTest.java |  10 +-
 .../stram/engine/WindowGeneratorTest.java       |  11 +-
 .../stram/webapp/StramWebServicesTest.java      |   6 +-
 25 files changed, 527 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
new file mode 100644
index 0000000..d5de61c
--- /dev/null
+++ b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
@@ -0,0 +1,111 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.common.util;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectStreamException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AsyncFSStorageAgent extends FSStorageAgent
+{
+  private final transient FileSystem fs;
+  private final transient Configuration conf;
+  private final String localBasePath;
+
+  private boolean syncCheckpoint = false;
+
+  private AsyncFSStorageAgent()
+  {
+    super();
+    fs = null;
+    conf = null;
+    localBasePath = null;
+  }
+
+  public AsyncFSStorageAgent(String path, Configuration conf)
+  {
+    this(".", path, conf);
+  }
+
+  public AsyncFSStorageAgent(String localBasePath, String path, Configuration conf)
+  {
+    super(path, conf);
+    if (localBasePath == null) {
+      this.localBasePath = "/tmp";
+    }
+    else {
+      this.localBasePath = localBasePath;
+    }
+    logger.debug("Initialize storage agent with {}.", this.localBasePath);
+    this.conf = conf == null ? new Configuration() : conf;
+    try {
+      fs = FileSystem.newInstance(this.conf);
+    } catch (IOException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  @Override
+  public void save(final Object object, final int operatorId, final long windowId) throws IOException
+  {
+    String operatorIdStr = String.valueOf(operatorId);
+    File directory = new File(localBasePath, operatorIdStr);
+    if (!directory.exists()) {
+      directory.mkdirs();
+    }
+    try (FileOutputStream stream = new FileOutputStream(new File(directory, String.valueOf(windowId)))) {
+      store(stream, object);
+    }
+  }
+
+  public void copyToHDFS(final int operatorId, final long windowId) throws IOException
+  {
+    String operatorIdStr = String.valueOf(operatorId);
+    File directory = new File(localBasePath, operatorIdStr);
+    String window = Long.toHexString(windowId);
+    Path lPath = new Path(path + Path.SEPARATOR + operatorIdStr + Path.SEPARATOR + System.currentTimeMillis() + TMP_FILE);
+    FileUtil.copy(new File(directory, String.valueOf(windowId)), fs, lPath, true, conf);
+    fileContext.rename(lPath, new Path(path + Path.SEPARATOR + operatorIdStr + Path.SEPARATOR + window), Options.Rename.OVERWRITE);
+  }
+
+  @Override
+  public Object readResolve() throws ObjectStreamException
+  {
+    return new AsyncFSStorageAgent(this.localBasePath, this.path, null);
+  }
+
+  public boolean isSyncCheckpoint()
+  {
+    return syncCheckpoint;
+  }
+
+  public void setSyncCheckpoint(boolean syncCheckpoint)
+  {
+    this.syncCheckpoint = syncCheckpoint;
+  }
+
+  private static final long serialVersionUID = 201507241610L;
+  private static final Logger logger = LoggerFactory.getLogger(AsyncFSStorageAgent.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
index 31b537d..14275fa 100644
--- a/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
+++ b/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
@@ -52,8 +52,7 @@ public class FSStorageAgent implements StorageAgent, Serializable
     kryo = new Kryo();
   }
 
-  @SuppressWarnings("unused")
-  private FSStorageAgent()
+  protected FSStorageAgent()
   {
     path = null;
     fileContext = null;

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/common/src/test/java/com/datatorrent/common/codec/JsonStreamCodecTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/datatorrent/common/codec/JsonStreamCodecTest.java b/common/src/test/java/com/datatorrent/common/codec/JsonStreamCodecTest.java
index e0a5f01..a9303bc 100644
--- a/common/src/test/java/com/datatorrent/common/codec/JsonStreamCodecTest.java
+++ b/common/src/test/java/com/datatorrent/common/codec/JsonStreamCodecTest.java
@@ -1,14 +1,17 @@
 /**
  * Copyright (C) 2015 DataTorrent, Inc.
  *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *         http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package com.datatorrent.common.codec;
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java b/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java
new file mode 100644
index 0000000..e7f9f66
--- /dev/null
+++ b/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java
@@ -0,0 +1,133 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.common.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.DAG;
+
+public class AsyncFSStorageAgentTest
+{
+  private static class TestMeta extends TestWatcher
+  {
+    String applicationPath;
+    String basePath;
+    AsyncFSStorageAgent storageAgent;
+
+    @Override
+    protected void starting(Description description)
+    {
+      super.starting(description);
+      basePath = "target/" + description.getClassName() + "/" + description.getMethodName();
+      applicationPath = basePath + "/app";
+      try {
+        FileUtils.forceMkdir(new File(basePath));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      storageAgent = new AsyncFSStorageAgent(basePath, applicationPath, null);
+
+      Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+      attributes.put(DAG.APPLICATION_PATH, applicationPath);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      try {
+        FileUtils.deleteDirectory(new File("target/" + description.getClassName()));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testSave() throws IOException
+  {
+    Map<Integer, String> data = Maps.newHashMap();
+    data.put(1, "one");
+    data.put(2, "two");
+    data.put(3, "three");
+    testMeta.storageAgent.save(data, 1, 1);
+    testMeta.storageAgent.copyToHDFS(1, 1);
+    @SuppressWarnings("unchecked")
+    Map<Integer, String> decoded = (Map<Integer, String>) testMeta.storageAgent.load(1, 1);
+    Assert.assertEquals("dataOf1", data, decoded);
+  }
+
+  @Test
+  public void testLoad() throws IOException
+  {
+    Map<Integer, String> dataOf1 = Maps.newHashMap();
+    dataOf1.put(1, "one");
+    dataOf1.put(2, "two");
+    dataOf1.put(3, "three");
+
+    Map<Integer, String> dataOf2 = Maps.newHashMap();
+    dataOf2.put(4, "four");
+    dataOf2.put(5, "five");
+    dataOf2.put(6, "six");
+
+    testMeta.storageAgent.save(dataOf1, 1, 1);
+    testMeta.storageAgent.copyToHDFS(1, 1);
+    testMeta.storageAgent.save(dataOf2, 2, 1);
+    testMeta.storageAgent.copyToHDFS(2, 1);
+    @SuppressWarnings("unchecked")
+    Map<Integer, String> decoded1 = (Map<Integer, String>) testMeta.storageAgent.load(1, 1);
+
+    @SuppressWarnings("unchecked")
+    Map<Integer, String> decoded2 = (Map<Integer, String>) testMeta.storageAgent.load(2, 1);
+    Assert.assertEquals("data of 1", dataOf1, decoded1);
+    Assert.assertEquals("data of 2", dataOf2, decoded2);
+  }
+
+  @Test
+  public void testRecovery() throws IOException
+  {
+    testSave();
+    testMeta.storageAgent = new AsyncFSStorageAgent(testMeta.basePath, testMeta.applicationPath, null);
+    testSave();
+  }
+
+  @Test
+  public void testDelete() throws IOException
+  {
+    testLoad();
+    testMeta.storageAgent.delete(1, 1);
+    Path appPath = new Path(testMeta.applicationPath);
+    FileContext fileContext = FileContext.getFileContext();
+    Assert.assertTrue("operator 2 window 1", fileContext.util().exists(new Path(appPath + "/" + 2 + "/" + 1)));
+    Assert.assertFalse("operator 1 window 1", fileContext.util().exists(new Path(appPath + "/" + 1 + "/" + 1)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/main/java/com/datatorrent/stram/StramClient.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java
index 657f678..dfb4511 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java
@@ -52,7 +52,7 @@ import org.apache.hadoop.yarn.util.Records;
 import org.apache.log4j.DTLoggerFactory;
 
 import com.datatorrent.api.Context.OperatorContext;
-
+import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.common.util.BasicContainerOptConfigurator;
 import com.datatorrent.common.util.FSStorageAgent;
 import com.datatorrent.stram.client.StramClientUtils;
@@ -456,8 +456,9 @@ public class StramClient
       if (dag.getAttributes().get(OperatorContext.STORAGE_AGENT) == null) { /* which would be the most likely case */
         Path checkpointPath = new Path(appPath, LogicalPlan.SUBDIR_CHECKPOINTS);
         // use conf client side to pickup any proxy settings from dt-site.xml
-        dag.setAttribute(OperatorContext.STORAGE_AGENT, new FSStorageAgent(checkpointPath.toString(), conf));
+        dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(checkpointPath.toString(), conf));
       }
+
       if(dag.getAttributes().get(LogicalPlan.CONTAINER_OPTS_CONFIGURATOR) == null){
         dag.setAttribute(LogicalPlan.CONTAINER_OPTS_CONFIGURATOR,new BasicContainerOptConfigurator());
       }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
index c7ac0cb..e28c097 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
@@ -35,11 +35,13 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.net.NetUtils;
 
+import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.LocalMode.Controller;
 import com.datatorrent.api.Operator;
 import com.datatorrent.bufferserver.server.Server;
 import com.datatorrent.bufferserver.storage.DiskStorage;
+import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.common.util.FSStorageAgent;
 import com.datatorrent.stram.StreamingContainerAgent.ContainerStartRequest;
 import com.datatorrent.stram.StreamingContainerManager.ContainerResource;
@@ -298,7 +300,7 @@ public class StramLocalCluster implements Runnable, Controller
       dag.getAttributes().put(LogicalPlan.APPLICATION_PATH, pathUri);
     }
     if (dag.getAttributes().get(OperatorContext.STORAGE_AGENT) == null) {
-      dag.setAttribute(OperatorContext.STORAGE_AGENT, new FSStorageAgent(new Path(pathUri, LogicalPlan.SUBDIR_CHECKPOINTS).toString(), null));
+      dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(new Path(pathUri, LogicalPlan.SUBDIR_CHECKPOINTS).toString(), null));
     }
     this.dnmgr = new StreamingContainerManager(dag);
     this.umbilical = new UmbilicalProtocolLocalImpl();

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index dbb3d11..5246c9e 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -5,7 +5,7 @@
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *         http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 95f4648..0847f3c 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -76,6 +76,7 @@ import com.datatorrent.api.annotation.Stateless;
 import com.datatorrent.bufferserver.auth.AuthManager;
 import com.datatorrent.bufferserver.util.Codec;
 import com.datatorrent.common.experimental.AppData;
+import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.common.util.FSStorageAgent;
 import com.datatorrent.common.util.NumberAggregate;
 import com.datatorrent.common.util.Pair;
@@ -2949,7 +2950,14 @@ public class StreamingContainerManager implements PlanContext
 
       this.finals = new FinalVars(finals, lp);
       StorageAgent sa = lp.getValue(OperatorContext.STORAGE_AGENT);
-      if (sa instanceof FSStorageAgent) {
+      if(sa instanceof AsyncFSStorageAgent){
+        // replace the default storage agent, if present
+        AsyncFSStorageAgent fssa = (AsyncFSStorageAgent) sa;
+        if (fssa.path.contains(oldAppId)) {
+          fssa = new AsyncFSStorageAgent(fssa.path.replace(oldAppId, appId), conf);
+          lp.setAttribute(OperatorContext.STORAGE_AGENT, fssa);
+        }
+      } else if (sa instanceof FSStorageAgent) {
         // replace the default storage agent, if present
         FSStorageAgent fssa = (FSStorageAgent) sa;
         if (fssa.path.contains(oldAppId)) {

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/main/java/com/datatorrent/stram/engine/Node.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Node.java b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
index 24679dc..ea33970 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/Node.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
@@ -27,8 +27,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.*;
 import java.util.Map.Entry;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.*;
 
 import org.apache.hadoop.util.ReflectionUtils;
 import org.slf4j.Logger;
@@ -46,6 +45,9 @@ import com.datatorrent.api.Operator.Unifier;
 import com.datatorrent.api.StatsListener.OperatorRequest;
 
 import com.datatorrent.bufferserver.util.Codec;
+import com.datatorrent.common.util.AsyncFSStorageAgent;
+import com.datatorrent.common.util.Pair;
+import com.datatorrent.netlet.util.DTThrowable;
 import com.datatorrent.stram.api.Checkpoint;
 import com.datatorrent.stram.api.OperatorDeployInfo;
 import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerStats;
@@ -99,12 +101,16 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera
   public final BlockingQueue<StatsListener.OperatorResponse> commandResponse;
   private final List<Field> metricFields;
   private final Map<String, Method> metricMethods;
+  private ExecutorService executorService;
+  private Queue<Pair<FutureTask<Stats.CheckpointStats>, Long>> taskQueue;
   protected Stats.CheckpointStats checkpointStats;
 
   public Node(OPERATOR operator, OperatorContext context)
   {
     this.operator = operator;
     this.context = context;
+    executorService = Executors.newSingleThreadExecutor();
+    taskQueue = new LinkedList<Pair<FutureTask<Stats.CheckpointStats>, Long>>();
 
     outputs = new HashMap<String, Sink<Object>>();
 
@@ -173,6 +179,9 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera
       pcpair.component.teardown();
     }
 
+    if (executorService != null) {
+      executorService.shutdownNow();
+    }
     operator.teardown();
   }
 
@@ -405,6 +414,21 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera
       checkpointStats = null;
       checkpoint = null;
     }
+    else {
+      Pair<FutureTask<Stats.CheckpointStats>, Long> pair = taskQueue.peek();
+      if (pair != null && pair.getFirst().isDone()) {
+        taskQueue.poll();
+        try {
+          stats.checkpointStats = pair.getFirst().get();
+          stats.checkpoint = new Checkpoint(pair.getSecond(), applicationWindowCount, checkpointWindowCount);
+          if (operator instanceof Operator.CheckpointListener) {
+            ((Operator.CheckpointListener) operator).checkpointed(pair.getSecond());
+          }
+        } catch (Exception ex) {
+          throw DTThrowable.wrapIfChecked(ex);
+        }
+      }
+    }
 
     context.report(stats, windowId);
   }
@@ -440,6 +464,25 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera
           checkpointStats = new Stats.CheckpointStats();
           checkpointStats.checkpointStartTime = System.currentTimeMillis();
           ba.save(operator, id, windowId);
+          if (ba instanceof AsyncFSStorageAgent) {
+            AsyncFSStorageAgent asyncFSStorageAgent = (AsyncFSStorageAgent) ba;
+            if (!asyncFSStorageAgent.isSyncCheckpoint() && PROCESSING_MODE != ProcessingMode.EXACTLY_ONCE) {
+              CheckpointHandler checkpointHandler = new CheckpointHandler();
+              checkpointHandler.agent = asyncFSStorageAgent;
+              checkpointHandler.operatorId = id;
+              checkpointHandler.windowId = windowId;
+              checkpointHandler.stats = checkpointStats;
+              FutureTask<Stats.CheckpointStats> futureTask = new FutureTask<Stats.CheckpointStats>(checkpointHandler);
+              taskQueue.add(new Pair<FutureTask<Stats.CheckpointStats>, Long>(futureTask, windowId));
+              executorService.submit(futureTask);
+              checkpoint = null;
+              checkpointStats = null;
+              return;
+            }
+            else {
+              asyncFSStorageAgent.copyToHDFS(id, windowId);
+            }
+          }
           checkpointStats.checkpointTime = System.currentTimeMillis() - checkpointStats.checkpointStartTime;
         }
         catch (IOException ie) {
@@ -570,5 +613,22 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera
     deactivateSinks();
   }
 
+  private class CheckpointHandler implements Callable<Stats.CheckpointStats>
+  {
+
+    public AsyncFSStorageAgent agent;
+    public int operatorId;
+    public long windowId;
+    public Stats.CheckpointStats stats;
+
+    @Override
+    public Stats.CheckpointStats call() throws Exception
+    {
+      agent.copyToHDFS(id, windowId);
+      stats.checkpointTime = System.currentTimeMillis() - stats.checkpointStartTime;
+      return stats;
+    }
+  }
+
   private static final Logger logger = LoggerFactory.getLogger(Node.class);
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index 7c0432d..5b90c04 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -40,6 +40,8 @@ import com.datatorrent.api.Partitioner.Partition;
 import com.datatorrent.api.Partitioner.PartitionKeys;
 import com.datatorrent.api.StatsListener.OperatorRequest;
 import com.datatorrent.api.annotation.Stateless;
+
+import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.stram.Journal.Recoverable;
 import com.datatorrent.stram.api.Checkpoint;
 import com.datatorrent.stram.api.StramEvent;
@@ -941,7 +943,11 @@ public class PhysicalPlan implements Serializable
     try {
       LOG.debug("Writing activation checkpoint {} {} {}", checkpoint, oper, oo);
       long windowId = oper.isOperatorStateLess() ? Stateless.WINDOW_ID : checkpoint.windowId;
-      oper.operatorMeta.getValue(OperatorContext.STORAGE_AGENT).save(oo, oper.id, windowId);
+      StorageAgent agent = oper.operatorMeta.getValue(OperatorContext.STORAGE_AGENT);
+      agent.save(oo, oper.id, windowId);
+      if (agent instanceof AsyncFSStorageAgent) {
+        ((AsyncFSStorageAgent) agent).copyToHDFS(oper.id, windowId);
+      }
     } catch (IOException e) {
       // inconsistent state, no recovery option, requires shutdown
       throw new IllegalStateException("Failed to write operator state after partition change " + oper, e);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
index dd804ec..4072894 100644
--- a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
@@ -37,6 +37,7 @@ import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 import com.datatorrent.api.annotation.Stateless;
 
+import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.stram.MockContainer.MockOperatorStats;
 import com.datatorrent.stram.StreamingContainerManager.UpdateCheckpointsContext;
 import com.datatorrent.stram.api.Checkpoint;
@@ -111,6 +112,9 @@ public class CheckpointTest
   {
     LogicalPlan dag = new LogicalPlan();
     dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
+    AsyncFSStorageAgent storageAgent = new AsyncFSStorageAgent(testMeta.dir + "/locaPath", testMeta.dir, null);
+    storageAgent.setSyncCheckpoint(true);
+    dag.setAttribute(OperatorContext.STORAGE_AGENT, storageAgent);
     dag.setAttribute(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 1);
     dag.setAttribute(LogicalPlan.HEARTBEAT_INTERVAL_MILLIS, 50);
     dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
@@ -127,14 +131,13 @@ public class CheckpointTest
     sc.setHeartbeatMonitoringEnabled(false);
     sc.run();
 
-    StorageAgent fssa = sc.getDAG().getValue(OperatorContext.STORAGE_AGENT);
     StreamingContainerManager dnm = sc.dnmgr;
     PhysicalPlan plan = dnm.getPhysicalPlan();
     Assert.assertEquals("number required containers", 1, dnm.getPhysicalPlan().getContainers().size());
 
     PTOperator o1p1 = plan.getOperators(dag.getMeta(o1)).get(0);
     Set<Long> checkpoints = Sets.newHashSet();
-    for (long windowId : fssa.getWindowIds(o1p1.getId())) {
+    for (long windowId : storageAgent.getWindowIds(o1p1.getId())) {
       checkpoints.add(windowId);
     }
     Assert.assertEquals("number checkpoints " + checkpoints, 3, checkpoints.size());
@@ -142,7 +145,7 @@ public class CheckpointTest
 
     PTOperator o2p1 = plan.getOperators(dag.getMeta(o2)).get(0);
     checkpoints = Sets.newHashSet();
-    for (long windowId : fssa.getWindowIds(o2p1.getId())) {
+    for (long windowId : storageAgent.getWindowIds(o2p1.getId())) {
       checkpoints.add(windowId);
     }
     Assert.assertEquals("number checkpoints " + checkpoints, 1, checkpoints.size());
@@ -152,7 +155,7 @@ public class CheckpointTest
     Assert.assertNotNull("checkpoint not null for statefull operator " + o1p1, o1p1.stats.checkpointStats);
 
     for (Checkpoint cp : o1p1.checkpoints) {
-      Object load = fssa.load(o1p1.getId(), cp.windowId);
+      Object load = storageAgent.load(o1p1.getId(), cp.windowId);
       Assert.assertEquals("Stored Operator and Saved State", load.getClass(), o1p1.getOperatorMeta().getOperator().getClass());
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java b/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java
index db1d9ec..78a1bd8 100644
--- a/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java
@@ -27,9 +27,10 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.StorageAgent;
 
+import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.common.util.FSStorageAgent;
-import com.datatorrent.stram.StreamingContainerManager;
 import com.datatorrent.stram.engine.GenericTestOperator;
 import com.datatorrent.stram.engine.OperatorContext;
 import com.datatorrent.stram.engine.TestGeneratorInputOperator;
@@ -291,15 +292,14 @@ public class LogicalPlanModificationTest
 
   }
 
-  @Test
-  public void testExecutionManager() throws Exception {
+  private void testExecutionManager(StorageAgent agent) throws Exception {
 
     LogicalPlan dag = new LogicalPlan();
     dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-    dag.setAttribute(OperatorContext.STORAGE_AGENT, new FSStorageAgent(testMeta.dir, null));
+    dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
 
     StreamingContainerManager dnm = new StreamingContainerManager(dag);
-    Assert.assertEquals(""+dnm.containerStartRequests, dnm.containerStartRequests.size(), 0);
+    Assert.assertEquals("" + dnm.containerStartRequests, dnm.containerStartRequests.size(), 0);
 
 
     CreateOperatorRequest cor = new CreateOperatorRequest();
@@ -331,4 +331,16 @@ public class LogicalPlanModificationTest
 
   }
 
+  @Test
+  public void testExecutionManagerWithSyncStorageAgent() throws Exception
+  {
+    testExecutionManager(new FSStorageAgent(testMeta.dir, null));
+  }
+
+  @Test
+  public void testExecutionManagerWithAsyncStorageAgent() throws Exception
+  {
+    testExecutionManager(new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
index 9c169ee..15ad76e 100644
--- a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
@@ -27,13 +27,13 @@ import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import com.google.common.collect.Sets;
+
 import com.datatorrent.api.*;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 import com.datatorrent.common.partitioner.StatelessPartitioner;
-import com.datatorrent.common.util.FSStorageAgent;
+import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.stram.StramLocalCluster.LocalStreamingContainer;
 import com.datatorrent.stram.api.Checkpoint;
 import com.datatorrent.stram.engine.Node;
@@ -150,6 +150,8 @@ public class PartitioningTest
   public void testDefaultPartitioning() throws Exception
   {
     LogicalPlan dag = new LogicalPlan();
+    File checkpointDir = new File(TEST_OUTPUT_DIR, "testDefaultPartitioning");
+    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(checkpointDir.getPath() + "/localPath", checkpointDir.getPath(), null));
 
     Integer[][] testData = {
       {4, 5}
@@ -249,6 +251,9 @@ public class PartitioningTest
   {
     LogicalPlan dag = new LogicalPlan();
     dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 5);
+    File checkpointDir = new File(TEST_OUTPUT_DIR, "testDynamicDefaultPartitioning");
+    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(checkpointDir.getPath() + "/localPath", checkpointDir.getPath(), null));
+
     CollectorOperator.receivedTuples.clear();
 
     TestInputOperator<Integer> input = dag.addOperator("input", new TestInputOperator<Integer>());
@@ -391,12 +396,12 @@ public class PartitioningTest
      *
      * @throws Exception
      */
-    @Test
-    public void testInputOperatorPartitioning() throws Exception
+
+    private void testInputOperatorPartitioning(LogicalPlan dag) throws Exception
     {
       File checkpointDir = new File(TEST_OUTPUT_DIR, "testInputOperatorPartitioning");
-      LogicalPlan dag = new LogicalPlan();
       dag.getAttributes().put(LogicalPlan.APPLICATION_PATH, checkpointDir.getPath());
+      dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(checkpointDir.getPath() + "/localPath", checkpointDir.getPath(), null));
 
       PartitionableInputOperator input = dag.addOperator("input", new PartitionableInputOperator());
       dag.setAttribute(input, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitionLoadWatch()}));
@@ -418,7 +423,10 @@ public class PartitioningTest
         Checkpoint checkpoint = new Checkpoint(10L, 0, 0);
         p.checkpoints.add(checkpoint);
         p.setRecoveryCheckpoint(checkpoint);
-        new FSStorageAgent(checkpointDir.getPath(), null).save(inputDeployed, p.getId(), 10L);
+        AsyncFSStorageAgent agent = new AsyncFSStorageAgent(checkpointDir.getPath() + "/localPath", checkpointDir.getPath(), null);
+        agent.save(inputDeployed, p.getId(), 10L);
+        agent.copyToHDFS(p.getId(), 10l);
+
       }
 
       Assert.assertEquals("", Sets.newHashSet("partition_0", "partition_1", "partition_2"), partProperties);
@@ -447,6 +455,12 @@ public class PartitioningTest
 
     }
 
+    @Test
+    public void testInputOperatorPartitioningWithAsyncStorageAgent() throws Exception
+    {
+      LogicalPlan dag = new LogicalPlan();
+      testInputOperatorPartitioning(dag);
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java b/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
index 8489c70..1881566 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
@@ -15,7 +15,6 @@
  */
 package com.datatorrent.stram;
 
-import com.datatorrent.stram.api.Checkpoint;
 import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
@@ -23,22 +22,17 @@ import java.io.LineNumberReader;
 import java.util.Arrays;
 import java.util.Map;
 
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
+import org.junit.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.api.Context;
+
+import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.stram.StramLocalCluster.LocalStreamingContainer;
 import com.datatorrent.stram.StramLocalCluster.MockComponentFactory;
-import com.datatorrent.stram.engine.GenericTestOperator;
-import com.datatorrent.stram.engine.Node;
-import com.datatorrent.stram.engine.OperatorContext;
-import com.datatorrent.stram.engine.TestGeneratorInputOperator;
-import com.datatorrent.stram.engine.TestOutputOperator;
-import com.datatorrent.stram.engine.WindowGenerator;
+import com.datatorrent.stram.api.Checkpoint;
+import com.datatorrent.stram.engine.*;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
 import com.datatorrent.stram.plan.physical.PTOperator;
 import com.datatorrent.stram.support.ManualScheduledExecutorService;
@@ -75,6 +69,7 @@ public class StramLocalClusterTest
   {
     LogicalPlan dag = new LogicalPlan();
     dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
+    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
 
     TestGeneratorInputOperator genNode = dag.addOperator("genNode", TestGeneratorInputOperator.class);
     genNode.setMaxTuples(2);
@@ -114,6 +109,9 @@ public class StramLocalClusterTest
   {
     LogicalPlan dag = new LogicalPlan();
     dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
+    AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null);
+    agent.setSyncCheckpoint(true);
+    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, agent);
 
     TestGeneratorInputOperator node1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
     // data will be added externally from test

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java b/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
index 4d0cd37..99478f5 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
@@ -48,6 +48,7 @@ import com.sun.jersey.api.client.WebResource;
 import com.datatorrent.api.*;
 import com.datatorrent.api.Context.OperatorContext;
 
+import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.stram.client.StramClientUtils;
 import com.datatorrent.stram.client.StramClientUtils.YarnClientHelper;
 import com.datatorrent.stram.engine.GenericTestOperator;
@@ -202,6 +203,9 @@ public class StramMiniClusterTest
     LogicalPlanConfiguration tb = new LogicalPlanConfiguration(conf);
     tb.addFromProperties(dagProps, null);
     LogicalPlan dag = createDAG(tb);
+    AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null);
+    agent.setSyncCheckpoint(true);
+    dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
     Configuration yarnConf = new Configuration(yarnCluster.getConfig());
     StramClient client = new StramClient(yarnConf, dag);
     try {
@@ -357,7 +361,10 @@ public class StramMiniClusterTest
   {
 
     LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(LogicalPlan.APPLICATION_PATH, "file:" + System.getProperty("user.dir") + "/" + testMeta.dir);
+    dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
+    AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null);
+    agent.setSyncCheckpoint(true);
+    dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
     FailingOperator badOperator = dag.addOperator("badOperator", FailingOperator.class);
     dag.getContextAttributes(badOperator).put(OperatorContext.RECOVERY_ATTEMPTS, 1);
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
index 8515734..6172d8a 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
@@ -46,6 +46,7 @@ import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.StatsListener;
 import com.datatorrent.api.StorageAgent;
 
+import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.common.util.FSStorageAgent;
 import com.datatorrent.stram.api.Checkpoint;
 import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
@@ -70,8 +71,7 @@ public class StramRecoveryTest
   private static final Logger LOG = LoggerFactory.getLogger(StramRecoveryTest.class);
   @Rule public final TestMeta testMeta = new TestMeta();
 
-  @Test
-  public void testPhysicalPlanSerialization() throws Exception
+  private void testPhysicalPlanSerialization(StorageAgent agent) throws Exception
   {
     LogicalPlan dag = new LogicalPlan();
 
@@ -86,7 +86,7 @@ public class StramRecoveryTest
     dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 2);
 
     TestPlanContext ctx = new TestPlanContext();
-    dag.setAttribute(OperatorContext.STORAGE_AGENT, new FSStorageAgent(testMeta.dir, null));
+    dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
     PhysicalPlan plan = new PhysicalPlan(dag, ctx);
 
     ByteArrayOutputStream  bos = new ByteArrayOutputStream();
@@ -121,6 +121,18 @@ public class StramRecoveryTest
 
   }
 
+  @Test
+  public void testPhysicalPlanSerializationWithSyncAgent() throws Exception
+  {
+    testPhysicalPlanSerialization(new FSStorageAgent(testMeta.dir, null));
+  }
+
+  @Test
+  public void testPhysicalPlanSerializationWithAsyncAgent() throws Exception
+  {
+    testPhysicalPlanSerialization(new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
+  }
+
   public static class StatsListeningOperator extends TestGeneratorInputOperator implements StatsListener
   {
     int processStatsCnt = 0;
@@ -144,14 +156,13 @@ public class StramRecoveryTest
    * Test serialization of the container manager with mock execution layer.
    * @throws Exception
    */
-  @Test
-  public void testContainerManager() throws Exception
+  private void testContainerManager(StorageAgent agent) throws Exception
   {
     FileUtils.deleteDirectory(new File(testMeta.dir)); // clean any state from previous run
 
     LogicalPlan dag = new LogicalPlan();
     dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
-    dag.setAttribute(OperatorContext.STORAGE_AGENT, new FSStorageAgent(testMeta.dir, null));
+    dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
 
     StatsListeningOperator o1 = dag.addOperator("o1", StatsListeningOperator.class);
 
@@ -254,6 +265,18 @@ public class StramRecoveryTest
   }
 
   @Test
+  public void testContainerManagerWithSyncAgent() throws Exception
+  {
+    testPhysicalPlanSerialization(new FSStorageAgent(testMeta.dir, null));
+  }
+
+  @Test
+  public void testContainerManagerWithAsyncAgent() throws Exception
+  {
+    testPhysicalPlanSerialization(new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
+  }
+
+  @Test
   public void testWriteAheadLog() throws Exception
   {
     final MutableInt flushCount = new MutableInt();
@@ -358,19 +381,17 @@ public class StramRecoveryTest
     scm.setPhysicalOperatorProperty(o1p1.getId(), "maxTuples", "50");
   }
 
-  @Test
-  public void testRestartApp() throws Exception
+  private void testRestartApp(StorageAgent agent, String appPath1) throws Exception
   {
     FileUtils.deleteDirectory(new File(testMeta.dir)); // clean any state from previous run
     String appId1 = "app1";
     String appId2 = "app2";
-    String appPath1 = testMeta.dir + "/" + appId1;
     String appPath2 = testMeta.dir + "/" + appId2;
 
     LogicalPlan dag = new LogicalPlan();
     dag.setAttribute(LogicalPlan.APPLICATION_ID, appId1);
     dag.setAttribute(LogicalPlan.APPLICATION_PATH, appPath1);
-    dag.setAttribute(OperatorContext.STORAGE_AGENT, new FSStorageAgent(appPath1 + "/" + LogicalPlan.SUBDIR_CHECKPOINTS, null));
+    dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
     dag.addOperator("o1", StatsListeningOperator.class);
 
     FSRecoveryHandler recoveryHandler = new FSRecoveryHandler(dag.assertAppPath(), new Configuration(false));
@@ -419,6 +440,21 @@ public class StramRecoveryTest
   }
 
   @Test
+  public void testRestartAppWithSyncAgent() throws Exception
+  {
+    String appPath1 = testMeta.dir + "/app1";
+    testRestartApp(new FSStorageAgent(appPath1 + "/" + LogicalPlan.SUBDIR_CHECKPOINTS, null), appPath1);
+  }
+
+  @Test
+  public void testRestartAppWithAsyncAgent() throws Exception
+  {
+    String appPath1 = testMeta.dir + "/app1";
+    String checkpointPath = testMeta.dir + "/localPath";
+    testRestartApp(new AsyncFSStorageAgent(checkpointPath, appPath1 + "/" + LogicalPlan.SUBDIR_CHECKPOINTS, null), appPath1);
+  }
+
+  @Test
   public void testRpcFailover() throws Exception
   {
     String appPath = testMeta.dir;

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
index c1567b8..ba15a78 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
@@ -32,6 +32,7 @@ import org.junit.Test;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
+import com.datatorrent.api.Context;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.DAG.Locality;
@@ -41,6 +42,7 @@ import com.datatorrent.api.StatsListener;
 import com.datatorrent.api.annotation.Stateless;
 
 import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.common.util.FSStorageAgent;
 import com.datatorrent.stram.StreamingContainerAgent.ContainerStartRequest;
 import com.datatorrent.stram.StreamingContainerManager.ContainerResource;
@@ -56,12 +58,7 @@ import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerSt
 import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHeartbeat;
 import com.datatorrent.stram.appdata.AppDataPushAgent;
 import com.datatorrent.stram.codec.DefaultStatefulStreamCodec;
-import com.datatorrent.stram.engine.DefaultUnifier;
-import com.datatorrent.stram.engine.GenericTestOperator;
-import com.datatorrent.stram.engine.TestAppDataQueryOperator;
-import com.datatorrent.stram.engine.TestAppDataResultOperator;
-import com.datatorrent.stram.engine.TestAppDataSourceOperator;
-import com.datatorrent.stram.engine.TestGeneratorInputOperator;
+import com.datatorrent.stram.engine.*;
 import com.datatorrent.stram.plan.TestPlanContext;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
 import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
@@ -471,6 +468,37 @@ public class StreamingContainerManagerTest {
   }
 
   @Test
+  public void testAsyncCheckpointWindowIds() throws Exception
+  {
+    File path = new File(testMeta.dir);
+    FileUtils.deleteDirectory(path.getAbsoluteFile());
+    FileUtils.forceMkdir(new File(path.getAbsoluteFile(), "/localPath"));
+
+    AsyncFSStorageAgent sa = new AsyncFSStorageAgent(path.getPath() + "/localPath", path.getPath(), null);
+
+    long[] windowIds = new long[]{123L, 345L, 234L};
+    for (long windowId : windowIds) {
+      sa.save(windowId, 1, windowId);
+      sa.copyToHDFS(1, windowId);
+    }
+
+    Arrays.sort(windowIds);
+    long[] windowsIds = sa.getWindowIds(1);
+    Arrays.sort(windowsIds);
+    Assert.assertArrayEquals("Saved windowIds", windowIds, windowsIds);
+
+    for (long windowId : windowIds) {
+      sa.delete(1, windowId);
+    }
+    try {
+      sa.getWindowIds(1);
+      Assert.fail("There should not be any most recently saved windowId!");
+    } catch (IOException io) {
+      Assert.assertTrue("No State Saved", true);
+    }
+  }
+
+  @Test
   public void testProcessHeartbeat() throws Exception
   {
     FileUtils.deleteDirectory(new File(testMeta.dir)); // clean any state from previous run
@@ -712,6 +740,8 @@ public class StreamingContainerManagerTest {
   @Test
   public void testPhysicalPropertyUpdate() throws Exception{
     LogicalPlan dag = new LogicalPlan();
+    String workingDir = new File("target/testPhysicalPropertyUpdate").getAbsolutePath();
+    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
     TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
     GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
     dag.addStream("o1.outport", o1.outport, o2.inport1);
@@ -735,7 +765,6 @@ public class StreamingContainerManagerTest {
           Class<? extends TestAppDataSourceOperator> dsClass, Class<? extends TestAppDataResultOperator> rClass)
   {
     LogicalPlan dag = new LogicalPlan();
-
     TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
     TestAppDataQueryOperator q = dag.addOperator("q", qClass);
     TestAppDataResultOperator r = dag.addOperator("r", rClass);
@@ -755,6 +784,8 @@ public class StreamingContainerManagerTest {
 
   private void testAppDataSources(LogicalPlan dag, boolean appendQIDToTopic) throws Exception
   {
+    String workingDir = new File("target/testAppDataSources").getAbsolutePath();
+    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
     StramLocalCluster lc = new StramLocalCluster(dag);
     lc.runAsync();
     StreamingContainerManager dnmgr = lc.dnmgr;

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java b/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
index 5f68c6a..718bf1b 100644
--- a/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
@@ -32,6 +32,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 
+import com.datatorrent.api.Context;
+import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.stram.engine.StreamingContainer;
 import com.datatorrent.stram.StramLocalCluster;
 import com.datatorrent.stram.debug.TupleRecorder.PortInfo;
@@ -210,6 +212,7 @@ public class TupleRecorderTest
   public void testRecordingFlow() throws Exception
   {
     LogicalPlan dag = new LogicalPlan();
+    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testWorkDir.getAbsolutePath() + "/localPath", testWorkDir.getAbsolutePath(), null));
 
     dag.getAttributes().put(LogicalPlan.APPLICATION_PATH, "file://" + testWorkDir.getAbsolutePath());
     dag.getAttributes().put(LogicalPlan.TUPLE_RECORDING_PART_FILE_SIZE, 1024);  // 1KB per part

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java b/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java
index a95956e..752adeb 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java
@@ -41,6 +41,7 @@ import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.Stats.OperatorStats;
 
 import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.stram.StramLocalCluster;
 import com.datatorrent.stram.engine.AutoMetricTest.TestOperator.TestStatsListener;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
@@ -183,6 +184,7 @@ public class AutoMetricTest
   public void testMetricPropagation() throws Exception
   {
     LogicalPlan dag = new LogicalPlan();
+    dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
     dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
     dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java b/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java
index 83bd61f..142f45f 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java
@@ -20,6 +20,7 @@ import com.datatorrent.stram.plan.logical.LogicalPlan;
 import com.datatorrent.stram.support.StramTestSupport;
 import com.datatorrent.stram.support.StramTestSupport.WaitCondition;
 import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
@@ -28,13 +29,13 @@ import com.datatorrent.api.Operator;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.netlet.util.CircularBuffer;
 
+import java.io.File;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.junit.Assert;
-
 import org.junit.Test;
 
 /**
@@ -124,6 +125,8 @@ public class InputOperatorTest
   public void testSomeMethod() throws Exception
   {
     LogicalPlan dag = new LogicalPlan();
+    String testWorkDir = new File("target").getAbsolutePath();
+    dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testWorkDir + "/localBasePath", testWorkDir, null));
     EvenOddIntegerGeneratorInputOperator generator = dag.addOperator("NumberGenerator", EvenOddIntegerGeneratorInputOperator.class);
     final CollectorModule<Number> collector = dag.addOperator("NumberCollector", new CollectorModule<Number>());
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
index bada257..0393394 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
@@ -15,7 +15,10 @@
  */
 package com.datatorrent.stram.engine;
 
+import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.common.util.BaseOperator;
+
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -75,6 +78,8 @@ public class ProcessingModeTests
     dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
     dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
     dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
+    String workingDir = new File("target/testLinearInputOperatorRecovery").getAbsolutePath();
+    dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
     RecoverableInputOperator rip = dag.addOperator("LongGenerator", RecoverableInputOperator.class);
     rip.setMaximumTuples(maxTuples);
     rip.setSimulateFailure(true);
@@ -97,6 +102,8 @@ public class ProcessingModeTests
     CollectorOperator.duplicates.clear();
 
     LogicalPlan dag = new LogicalPlan();
+    String workingDir = new File("target/testLinearOperatorRecovery").getAbsolutePath();
+    dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
     dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
     dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
     dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
@@ -121,6 +128,8 @@ public class ProcessingModeTests
     CollectorOperator.duplicates.clear();
 
     LogicalPlan dag = new LogicalPlan();
+    String workingDir = new File("target/testLinearInlineOperatorsRecovery").getAbsolutePath();
+    dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
     dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
     dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
     dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java b/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java
index 754b150..26515d4 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java
@@ -15,6 +15,9 @@
  */
 package com.datatorrent.stram.engine;
 
+import java.io.File;
+
+import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.common.util.BaseOperator;
 import org.junit.Assert;
 import org.junit.Test;
@@ -133,6 +136,8 @@ public class SliderTest
   private void test(int applicationWindowCount, int slideByWindowCount) throws Exception
   {
     LogicalPlan dag = new LogicalPlan();
+    String workingDir = new File("target/sliderTest").getAbsolutePath();
+    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
     dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 100);
     Input input = dag.addOperator("Input", new Input());
     Sum sum = dag.addOperator("Sum", new Sum());

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java b/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java
index 0019f56..0ededd4 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java
@@ -15,6 +15,7 @@
  */
 package com.datatorrent.stram.engine;
 
+import java.io.File;
 import java.io.Serializable;
 import java.util.*;
 
@@ -23,12 +24,14 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.Stats.OperatorStats;
 import com.datatorrent.api.Stats.OperatorStats.PortStats;
 import com.datatorrent.api.StatsListener;
 
+import com.datatorrent.common.util.AsyncFSStorageAgent;
+import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.stram.StramLocalCluster;
 import com.datatorrent.stram.StreamingContainerManager;
 import com.datatorrent.stram.engine.StatsTest.TestCollector.TestCollectorStatsListener;
@@ -170,7 +173,8 @@ public class StatsTest
   {
     int tupleCount = 10;
     LogicalPlan dag = new LogicalPlan();
-
+    String workingDir = new File("target").getAbsolutePath();
+    dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
     TestOperator testOper = dag.addOperator("TestOperator", TestOperator.class);
     TestInputStatsListener testInputStatsListener = new TestInputStatsListener();
     dag.setAttribute(testOper, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{testInputStatsListener}));
@@ -225,6 +229,8 @@ public class StatsTest
   private void baseTestForQueueSize(int maxTuples, TestCollectorStatsListener statsListener, DAG.Locality locality) throws Exception
   {
     LogicalPlan dag = new LogicalPlan();
+    String workingDir = new File("target/baseTestForQueueSize").getAbsolutePath();
+    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
     dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 200);
     TestOperator testOper = dag.addOperator("TestOperator", TestOperator.class);
     testOper.setMaxTuples(maxTuples);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
index a6897e0..4f7b842 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
@@ -15,6 +15,7 @@
  */
 package com.datatorrent.stram.engine;
 
+import java.io.File;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -24,12 +25,11 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.common.util.BaseOperator;
+
+import com.datatorrent.api.*;
 import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.api.Sink;
 
 import com.datatorrent.bufferserver.packet.MessageType;
 import com.datatorrent.common.util.ScheduledThreadPoolExecutor;
@@ -305,7 +305,8 @@ public class WindowGeneratorTest
   {
     logger.info("Testing Out of Sequence Error");
     LogicalPlan dag = new LogicalPlan();
-
+    String workingDir = new File("target/testOutofSequenceError").getAbsolutePath();
+    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
     RandomNumberGenerator rng = dag.addOperator("random", new RandomNumberGenerator());
     MyLogger ml = dag.addOperator("logger", new MyLogger());
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java b/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java
index 49c7844..9b8f0b2 100644
--- a/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java
@@ -56,6 +56,8 @@ import static org.junit.Assert.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
 
+import com.datatorrent.api.Context;
+import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.stram.StramAppContext;
 import com.datatorrent.stram.StreamingContainerManager;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
@@ -125,7 +127,9 @@ public class StramWebServicesTest extends JerseyTest
       protected void configureServlets()
       {
         LogicalPlan dag = new LogicalPlan();
-        dag.setAttribute(LogicalPlan.APPLICATION_PATH, new File("target", StramWebServicesTest.class.getName()).getAbsolutePath());
+        String workingDir = new File("target", StramWebServicesTest.class.getName()).getAbsolutePath();
+        dag.setAttribute(LogicalPlan.APPLICATION_PATH, workingDir);
+        dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
         final DummyStreamingContainerManager streamingContainerManager = new DummyStreamingContainerManager(dag);
 
         appContext = new TestAppContext();