You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2015/10/25 20:37:45 UTC

[02/36] incubator-apex-core git commit: APEX-162 #resolve Enhance StramTestSupport.TestMeta API.

APEX-162 #resolve Enhance StramTestSupport.TestMeta API.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/3c35cccb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/3c35cccb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/3c35cccb

Branch: refs/heads/feature-module
Commit: 3c35cccbd574d2a28e543b644deeb9f7c8a886e5
Parents: 809e6f6
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Mon Oct 12 16:39:32 2015 -0700
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Mon Oct 12 16:39:32 2015 -0700

----------------------------------------------------------------------
 engine/pom.xml                                  |  2 +-
 .../com/datatorrent/stram/CheckpointTest.java   | 55 ++++--------
 .../stram/LogicalPlanModificationTest.java      | 34 ++++----
 .../datatorrent/stram/OutputUnifiedTest.java    | 29 ++++---
 .../stram/StramLocalClusterTest.java            | 17 ++--
 .../datatorrent/stram/StramMiniClusterTest.java |  6 +-
 .../datatorrent/stram/StramRecoveryTest.java    | 48 +++++------
 .../com/datatorrent/stram/StreamCodecTest.java  | 51 ++---------
 .../stram/StreamingContainerManagerTest.java    | 90 +++++++-------------
 .../stram/engine/AutoMetricTest.java            | 22 ++---
 .../stram/engine/ProcessingModeTests.java       | 17 ++--
 .../stram/plan/StreamPersistanceTests.java      | 27 ++----
 .../stram/support/StramTestSupport.java         | 64 ++++++++++++--
 13 files changed, 212 insertions(+), 250 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/pom.xml
----------------------------------------------------------------------
diff --git a/engine/pom.xml b/engine/pom.xml
index 7974313..2165500 100644
--- a/engine/pom.xml
+++ b/engine/pom.xml
@@ -145,7 +145,7 @@
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-checkstyle-plugin</artifactId>
         <configuration>
-          <maxAllowedViolations>2322</maxAllowedViolations>
+          <maxAllowedViolations>2320</maxAllowedViolations>
         </configuration>
       </plugin>
     </plugins>

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
index ae28ebd..5d11b86 100644
--- a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
@@ -19,8 +19,7 @@
 package com.datatorrent.stram;
 
 import com.datatorrent.common.util.BaseOperator;
-import java.io.File;
-import java.io.IOException;
+
 import java.util.*;
 
 import com.google.common.collect.Maps;
@@ -30,8 +29,6 @@ import org.junit.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 
@@ -51,6 +48,7 @@ import com.datatorrent.stram.plan.logical.LogicalPlan;
 import com.datatorrent.stram.plan.physical.PTContainer;
 import com.datatorrent.stram.plan.physical.PTOperator;
 import com.datatorrent.stram.plan.physical.PhysicalPlan;
+import com.datatorrent.stram.support.StramTestSupport;
 import com.datatorrent.stram.support.StramTestSupport.MemoryStorageAgent;
 import com.datatorrent.stram.support.StramTestSupport.TestMeta;
 
@@ -61,30 +59,9 @@ public class CheckpointTest
 {
   @SuppressWarnings("unused")
   private static final Logger LOG = LoggerFactory.getLogger(CheckpointTest.class);
-  @Rule public TestMeta testMeta = new TestMeta();
-
-  /**
-   *
-   * @throws IOException
-   */
-  @Before
-  public void setupEachTest() throws IOException
-  {
-    try {
-      FileContext.getLocalFSFileContext().delete(
-              new Path(new File(testMeta.dir).getAbsolutePath()), true);
-    }
-    catch (Exception e) {
-      throw new RuntimeException("could not cleanup test dir", e);
-    }
-    //StramChild.eventloop.start();
-  }
 
-  @After
-  public void teardown()
-  {
-    //StramChild.eventloop.stop();
-  }
+  @Rule
+  public TestMeta testMeta = new TestMeta();
 
   private static class MockInputOperator extends BaseOperator implements InputOperator
   {
@@ -106,6 +83,14 @@ public class CheckpointTest
     }
   }
 
+  private LogicalPlan dag;
+
+  @Before
+  public void setup()
+  {
+    dag = StramTestSupport.createDAG(testMeta);
+  }
+
   /**
    * Test saving of operator state at window boundary.
    * @throws Exception
@@ -113,9 +98,7 @@ public class CheckpointTest
   @Test
   public void testBackup() throws Exception
   {
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
-    AsyncFSStorageAgent storageAgent = new AsyncFSStorageAgent(testMeta.dir, null);
+    AsyncFSStorageAgent storageAgent = new AsyncFSStorageAgent(testMeta.getPath(), null);
     storageAgent.setSyncCheckpoint(true);
     dag.setAttribute(OperatorContext.STORAGE_AGENT, storageAgent);
     dag.setAttribute(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 1);
@@ -172,8 +155,7 @@ public class CheckpointTest
   public void testUpdateRecoveryCheckpoint() throws Exception
   {
     Clock clock = new SystemClock();
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
+
     dag.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
 
     GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
@@ -273,8 +255,7 @@ public class CheckpointTest
   public void testUpdateCheckpointsRecovery()
   {
     MockClock clock = new MockClock();
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
+
     dag.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
     dag.setAttribute(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 1);
 
@@ -338,8 +319,7 @@ public class CheckpointTest
   public void testUpdateCheckpointsProcessingTimeout()
   {
     MockClock clock = new MockClock();
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
+
     dag.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
 
     GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
@@ -419,8 +399,7 @@ public class CheckpointTest
   public void testBlockedOperatorContainerRestart()
   {
     MockClock clock = new MockClock();
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
+
     dag.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
 
     GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java b/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java
index 847f3fd..8a50124 100644
--- a/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java
@@ -26,6 +26,7 @@ import java.util.concurrent.FutureTask;
 import javax.validation.ValidationException;
 
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -46,18 +47,26 @@ import com.datatorrent.stram.plan.physical.PTContainer;
 import com.datatorrent.stram.plan.physical.PTOperator;
 import com.datatorrent.stram.plan.physical.PhysicalPlan;
 import com.datatorrent.stram.plan.physical.PlanModifier;
+import com.datatorrent.stram.support.StramTestSupport;
 import com.datatorrent.stram.support.StramTestSupport.TestMeta;
 import com.google.common.collect.Sets;
 
 public class LogicalPlanModificationTest
 {
-  @Rule public TestMeta testMeta = new TestMeta();
+  private LogicalPlan dag;
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Before
+  public void setup()
+  {
+    dag = StramTestSupport.createDAG(testMeta);
+  }
 
   @Test
   public void testAddOperator()
   {
-    LogicalPlan dag = new LogicalPlan();
-
     GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
     GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
     GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
@@ -94,7 +103,6 @@ public class LogicalPlanModificationTest
   @Test
   public void testSetOperatorProperty()
   {
-    LogicalPlan dag = new LogicalPlan();
     GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
     OperatorMeta o1Meta = dag.getMeta(o1);
 
@@ -121,8 +129,6 @@ public class LogicalPlanModificationTest
   @Test
   public void testRemoveOperator()
   {
-    LogicalPlan dag = new LogicalPlan();
-
     GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
     OperatorMeta o1Meta = dag.getMeta(o1);
     GenericTestOperator o12 = dag.addOperator("o12", GenericTestOperator.class);
@@ -191,8 +197,6 @@ public class LogicalPlanModificationTest
   @Test
   public void testRemoveOperator2()
   {
-    LogicalPlan dag = new LogicalPlan();
-
     GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
     OperatorMeta o1Meta = dag.getMeta(o1);
     GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
@@ -233,8 +237,6 @@ public class LogicalPlanModificationTest
   @Test
   public void testRemoveStream()
   {
-    LogicalPlan dag = new LogicalPlan();
-
     GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
     GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
 
@@ -255,8 +257,6 @@ public class LogicalPlanModificationTest
   @Test
   public void testAddStream()
   {
-    LogicalPlan dag = new LogicalPlan();
-
     GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
     GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
 
@@ -295,10 +295,8 @@ public class LogicalPlanModificationTest
 
   }
 
-  private void testExecutionManager(StorageAgent agent) throws Exception {
-
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
+  private void testExecutionManager(StorageAgent agent) throws Exception
+  {
     dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
 
     StreamingContainerManager dnm = new StreamingContainerManager(dag);
@@ -337,13 +335,13 @@ public class LogicalPlanModificationTest
   @Test
   public void testExecutionManagerWithSyncStorageAgent() throws Exception
   {
-    testExecutionManager(new FSStorageAgent(testMeta.dir, null));
+    testExecutionManager(new FSStorageAgent(testMeta.getPath(), null));
   }
 
   @Test
   public void testExecutionManagerWithAsyncStorageAgent() throws Exception
   {
-    testExecutionManager(new AsyncFSStorageAgent(testMeta.dir, null));
+    testExecutionManager(new AsyncFSStorageAgent(testMeta.getPath(), null));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/src/test/java/com/datatorrent/stram/OutputUnifiedTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/OutputUnifiedTest.java b/engine/src/test/java/com/datatorrent/stram/OutputUnifiedTest.java
index f0461ba..7581cc3 100644
--- a/engine/src/test/java/com/datatorrent/stram/OutputUnifiedTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/OutputUnifiedTest.java
@@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
 import java.util.List;
 
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -50,12 +51,18 @@ public class OutputUnifiedTest
   @Rule
   public StramTestSupport.TestMeta testMeta = new StramTestSupport.TestMeta();
 
-  @Test
-  public void testManyToOnePartition() throws Exception {
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
+  private LogicalPlan dag;
+
+  @Before
+  public void setup()
+  {
+    dag = StramTestSupport.createDAG(testMeta);
     dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
+  }
 
+  @Test
+  public void testManyToOnePartition() throws Exception
+  {
     TestInputOperator i1 = new TestInputOperator();
     dag.addOperator("i1", i1);
 
@@ -83,11 +90,8 @@ public class OutputUnifiedTest
   }
 
   @Test
-  public void testMxNPartition() throws Exception {
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-    dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
-
+  public void testMxNPartition() throws Exception
+  {
     TestInputOperator i1 = new TestInputOperator();
     dag.addOperator("i1", i1);
 
@@ -117,11 +121,8 @@ public class OutputUnifiedTest
   }
 
   @Test
-  public void testParallelPartition() throws Exception {
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-    dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
-
+  public void testParallelPartition() throws Exception
+  {
     TestInputOperator i1 = new TestInputOperator();
     dag.addOperator("i1", i1);
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java b/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
index c784fd1..aaf92b8 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
@@ -29,8 +29,6 @@ import org.junit.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datatorrent.api.Context;
-
 import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.stram.StramLocalCluster.LocalStreamingContainer;
 import com.datatorrent.stram.StramLocalCluster.MockComponentFactory;
@@ -48,11 +46,12 @@ public class StramLocalClusterTest
   @Rule
   public StramTestSupport.TestMeta testMeta = new StramTestSupport.TestMeta();
 
+  private LogicalPlan dag;
+
   @Before
   public void setup() throws IOException
   {
-//    StramChild.eventloop = new DefaultEventLoop("StramLocalClusterTestEventLoop");
-//    StramChild.eventloop.start();
+    dag = StramTestSupport.createDAG(testMeta);
   }
 
   @After
@@ -70,9 +69,7 @@ public class StramLocalClusterTest
   @Test
   public void testLocalClusterInitShutdown() throws Exception
   {
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
-    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null));
+    dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.getPath(), null));
 
     TestGeneratorInputOperator genNode = dag.addOperator("genNode", TestGeneratorInputOperator.class);
     genNode.setMaxTuples(2);
@@ -110,11 +107,9 @@ public class StramLocalClusterTest
   @SuppressWarnings("SleepWhileInLoop")
   public void testRecovery() throws Exception
   {
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
-    AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.dir, null);
+    AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.getPath(), null);
     agent.setSyncCheckpoint(true);
-    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, agent);
+    dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
 
     TestGeneratorInputOperator node1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
     // data will be added externally from test

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java b/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
index f0fd325..d5cb14f 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
@@ -227,8 +227,8 @@ public class StramMiniClusterTest
   private LogicalPlan createDAG(LogicalPlanConfiguration lpc) throws Exception
   {
     LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(LogicalPlan.APPLICATION_PATH, new File(testMeta.dir).toURI().toString());
-    lpc.prepareDAG(dag,null,"testApp");
+    dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.toURI().toString());
+    lpc.prepareDAG(dag, null, "testApp");
     dag.validate();
     Assert.assertEquals("", Integer.valueOf(128), dag.getValue(DAG.MASTER_MEMORY_MB));
     Assert.assertEquals("", "-Dlog4j.properties=custom_log4j.properties", dag.getValue(DAG.CONTAINER_JVM_OPTIONS));
@@ -360,7 +360,7 @@ public class StramMiniClusterTest
   {
 
     LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(LogicalPlan.APPLICATION_PATH, new File(testMeta.dir).toURI().toString());
+    dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.toURI().toString());
     FailingOperator badOperator = dag.addOperator("badOperator", FailingOperator.class);
     dag.getContextAttributes(badOperator).put(OperatorContext.RECOVERY_ATTEMPTS, 1);
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
index 89ae3e7..6dbdcf0 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.ipc.RPC.Server;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.test.MockitoUtil;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -65,6 +66,7 @@ import com.datatorrent.stram.plan.physical.PTContainer;
 import com.datatorrent.stram.plan.physical.PTOperator;
 import com.datatorrent.stram.plan.physical.PhysicalPlan;
 import com.datatorrent.stram.plan.physical.PhysicalPlanTest.PartitioningTestOperator;
+import com.datatorrent.stram.support.StramTestSupport;
 import com.datatorrent.stram.support.StramTestSupport.TestMeta;
 
 import static org.junit.Assert.assertEquals;
@@ -72,12 +74,20 @@ import static org.junit.Assert.assertEquals;
 public class StramRecoveryTest
 {
   private static final Logger LOG = LoggerFactory.getLogger(StramRecoveryTest.class);
-  @Rule public final TestMeta testMeta = new TestMeta();
 
-  private void testPhysicalPlanSerialization(StorageAgent agent) throws Exception
+  @Rule
+  public final TestMeta testMeta = new TestMeta();
+
+  private LogicalPlan dag;
+
+  @Before
+  public void setup()
   {
-    LogicalPlan dag = new LogicalPlan();
+    dag = StramTestSupport.createDAG(testMeta);
+  }
 
+  private void testPhysicalPlanSerialization(StorageAgent agent) throws Exception
+  {
     GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
     PartitioningTestOperator o2 = dag.addOperator("o2", PartitioningTestOperator.class);
     o2.setPartitionCount(3);
@@ -127,13 +137,13 @@ public class StramRecoveryTest
   @Test
   public void testPhysicalPlanSerializationWithSyncAgent() throws Exception
   {
-    testPhysicalPlanSerialization(new FSStorageAgent(testMeta.dir, null));
+    testPhysicalPlanSerialization(new FSStorageAgent(testMeta.getPath(), null));
   }
 
   @Test
   public void testPhysicalPlanSerializationWithAsyncAgent() throws Exception
   {
-    testPhysicalPlanSerialization(new AsyncFSStorageAgent(testMeta.dir, null));
+    testPhysicalPlanSerialization(new AsyncFSStorageAgent(testMeta.getPath(), null));
   }
 
   public static class StatsListeningOperator extends TestGeneratorInputOperator implements StatsListener
@@ -161,10 +171,6 @@ public class StramRecoveryTest
    */
   private void testContainerManager(StorageAgent agent) throws Exception
   {
-    FileUtils.deleteDirectory(new File(testMeta.dir)); // clean any state from previous run
-
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
     dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
 
     StatsListeningOperator o1 = dag.addOperator("o1", StatsListeningOperator.class);
@@ -188,8 +194,7 @@ public class StramRecoveryTest
     assertEquals("state " + o1p1, PTOperator.State.PENDING_DEPLOY, o1p1.getState());
 
     // test restore initial snapshot + log
-    dag = new LogicalPlan();
-    dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
+    dag = StramTestSupport.createDAG(testMeta);
     scm = StreamingContainerManager.getInstance(new FSRecoveryHandler(dag.assertAppPath(), new Configuration(false)), dag, false);
     dag = scm.getLogicalPlan();
     plan = scm.getPhysicalPlan();
@@ -245,8 +250,7 @@ public class StramRecoveryTest
     checkpoint(scm, o1p1, offlineCheckpoint);
 
     // test restore
-    dag = new LogicalPlan();
-    dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
+    dag = StramTestSupport.createDAG(testMeta);
     scm = StreamingContainerManager.getInstance(new FSRecoveryHandler(dag.assertAppPath(), new Configuration(false)), dag, false);
 
     Assert.assertNotSame("dag references", dag, scm.getLogicalPlan());
@@ -270,13 +274,13 @@ public class StramRecoveryTest
   @Test
   public void testContainerManagerWithSyncAgent() throws Exception
   {
-    testPhysicalPlanSerialization(new FSStorageAgent(testMeta.dir, null));
+    testPhysicalPlanSerialization(new FSStorageAgent(testMeta.getPath(), null));
   }
 
   @Test
   public void testContainerManagerWithAsyncAgent() throws Exception
   {
-    testPhysicalPlanSerialization(new AsyncFSStorageAgent(testMeta.dir, null));
+    testPhysicalPlanSerialization(new AsyncFSStorageAgent(testMeta.getPath(), null));
   }
 
   @Test
@@ -284,9 +288,7 @@ public class StramRecoveryTest
   {
     final MutableInt flushCount = new MutableInt();
     final MutableBoolean isClosed = new MutableBoolean(false);
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
-    dag.setAttribute(OperatorContext.STORAGE_AGENT, new FSStorageAgent(testMeta.dir, null));
+    dag.setAttribute(OperatorContext.STORAGE_AGENT, new FSStorageAgent(testMeta.getPath(), null));
 
     TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
     StreamingContainerManager scm = new StreamingContainerManager(dag);
@@ -386,12 +388,10 @@ public class StramRecoveryTest
 
   private void testRestartApp(StorageAgent agent, String appPath1) throws Exception
   {
-    FileUtils.deleteDirectory(new File(testMeta.dir)); // clean any state from previous run
     String appId1 = "app1";
     String appId2 = "app2";
-    String appPath2 = testMeta.dir + "/" + appId2;
+    String appPath2 = testMeta.getPath() + "/" + appId2;
 
-    LogicalPlan dag = new LogicalPlan();
     dag.setAttribute(LogicalPlan.APPLICATION_ID, appId1);
     dag.setAttribute(LogicalPlan.APPLICATION_PATH, appPath1);
     dag.setAttribute(LogicalPlan.APPLICATION_ATTEMPT_ID, 1);
@@ -445,21 +445,21 @@ public class StramRecoveryTest
   @Test
   public void testRestartAppWithSyncAgent() throws Exception
   {
-    String appPath1 = testMeta.dir + "/app1";
+    final String appPath1 = testMeta.getPath() + "/app1";
     testRestartApp(new FSStorageAgent(appPath1 + "/" + LogicalPlan.SUBDIR_CHECKPOINTS, null), appPath1);
   }
 
   @Test
   public void testRestartAppWithAsyncAgent() throws Exception
   {
-    String appPath1 = testMeta.dir + "/app1";
+    final String appPath1 = testMeta.getPath() + "/app1";
     testRestartApp(new AsyncFSStorageAgent(appPath1 + "/" + LogicalPlan.SUBDIR_CHECKPOINTS, null), appPath1);
   }
 
   @Test
   public void testRpcFailover() throws Exception
   {
-    String appPath = testMeta.dir;
+    String appPath = testMeta.getPath();
     Configuration conf = new Configuration(false);
     final AtomicBoolean timedout = new AtomicBoolean();
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
index 6bfa591..ddf3448 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
@@ -34,6 +34,7 @@ import com.google.common.collect.Lists;
 import java.io.Serializable;
 import java.util.*;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -41,15 +42,20 @@ import org.junit.Test;
  */
 public class StreamCodecTest
 {
+  private LogicalPlan dag;
+
   @Rule
   public StramTestSupport.TestMeta testMeta = new StramTestSupport.TestMeta();
 
+  @Before
+  public void setup()
+  {
+    dag = StramTestSupport.createDAG(testMeta);
+  }
+
   @Test
   public void testStreamCodec()
   {
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-
     GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
     GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
     GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class);
@@ -107,9 +113,6 @@ public class StreamCodecTest
   @Test
   public void testStreamCodecReuse()
   {
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-
     GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
     GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
     GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class);
@@ -154,9 +157,6 @@ public class StreamCodecTest
   @Test
   public void testDefaultStreamCodec()
   {
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-
     GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
     DefaultCodecOperator node2 = dag.addOperator("node2", DefaultCodecOperator.class);
     DefaultCodecOperator node3 = dag.addOperator("node3", DefaultCodecOperator.class);
@@ -213,9 +213,6 @@ public class StreamCodecTest
   @Test
   public void testPartitioningStreamCodec()
   {
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-
     GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
     GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
     dag.setAttribute(node2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3));
@@ -264,9 +261,6 @@ public class StreamCodecTest
   @Test
   public void testMxNPartitioningStreamCodec()
   {
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-
     GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
     dag.setAttribute(node1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2));
     GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
@@ -331,9 +325,6 @@ public class StreamCodecTest
   @Test
   public void testParallelPartitioningStreamCodec()
   {
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-
     GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
     dag.setAttribute(node1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2));
     GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
@@ -422,9 +413,6 @@ public class StreamCodecTest
   @Test
   public void testMultipleInputStreamCodec()
   {
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-
     GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
     TestStreamCodec serDe = new TestStreamCodec();
     GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
@@ -477,9 +465,6 @@ public class StreamCodecTest
   @Test
   public void testPartitioningMultipleInputStreamCodec()
   {
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-
     GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
     GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
     dag.setAttribute(node1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2));
@@ -556,9 +541,6 @@ public class StreamCodecTest
   @Test
   public void testMultipleStreamCodecs()
   {
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-
     GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
     GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
     GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class);
@@ -613,9 +595,6 @@ public class StreamCodecTest
   @Test
   public void testPartitioningMultipleStreamCodecs()
   {
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-
     GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
     GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
     GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class);
@@ -699,9 +678,6 @@ public class StreamCodecTest
   @Test
   public void testMxNMultipleStreamCodecs()
   {
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-
     GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
     dag.setAttribute(node1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2));
     GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
@@ -803,9 +779,6 @@ public class StreamCodecTest
   @Test
   public void testInlineStreamCodec()
   {
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-
     GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
     GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
     GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class);
@@ -887,9 +860,6 @@ public class StreamCodecTest
   @Test
   public void testCascadingStreamCodec()
   {
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-
     GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
     GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
     GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class);
@@ -974,9 +944,6 @@ public class StreamCodecTest
   @Test
   public void testDynamicPartitioningStreamCodec()
   {
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-
     GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
     dag.setAttribute(node1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2));
     dag.setAttribute(node1, Context.OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener) new PartitioningTest.PartitionLoadWatch()));

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
index 710440d..b257632 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
@@ -29,6 +29,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.DataOutputByteBuffer;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -83,10 +84,20 @@ import org.eclipse.jetty.websocket.WebSocket;
 
 public class StreamingContainerManagerTest
 {
-  @Rule public TestMeta testMeta = new TestMeta();
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  private LogicalPlan dag;
+
+  @Before
+  public void setup()
+  {
+    dag = StramTestSupport.createDAG(testMeta);
+  }
 
   @Test
-  public void testDeployInfoSerialization() throws Exception {
+  public void testDeployInfoSerialization() throws Exception
+  {
     OperatorDeployInfo ndi = new OperatorDeployInfo();
     ndi.name = "node1";
     ndi.type = OperatorDeployInfo.OperatorType.GENERIC;
@@ -136,11 +147,8 @@ public class StreamingContainerManagerTest
   }
 
   @Test
-  public void testGenerateDeployInfo() {
-
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-
+  public void testGenerateDeployInfo()
+  {
     TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
     GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
     GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
@@ -238,9 +246,8 @@ public class StreamingContainerManagerTest
   }
 
   @Test
-  public void testStaticPartitioning() {
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
+  public void testStaticPartitioning()
+  {
     //
     //            ,---> node2----,
     //            |              |
@@ -355,9 +362,6 @@ public class StreamingContainerManagerTest
   @Test
   public void testRecoveryOrder() throws Exception
   {
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
-
     GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
     GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
     GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class);
@@ -406,9 +410,6 @@ public class StreamingContainerManagerTest
   @Test
   public void testRecoveryUpstreamInline() throws Exception
   {
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
-
     GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
     GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
     GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
@@ -444,11 +445,9 @@ public class StreamingContainerManagerTest
   }
 
   @Test
-  public void testCheckpointWindowIds() throws Exception {
-    File path =  new File(testMeta.dir);
-    FileUtils.deleteDirectory(path.getAbsoluteFile());
-
-    FSStorageAgent sa = new FSStorageAgent(path.getPath(), null);
+  public void testCheckpointWindowIds() throws Exception
+  {
+    FSStorageAgent sa = new FSStorageAgent(testMeta.getPath(), null);
 
     long[] windowIds = new long[]{123L, 345L, 234L};
     for (long windowId : windowIds) {
@@ -475,11 +474,7 @@ public class StreamingContainerManagerTest
   @Test
   public void testAsyncCheckpointWindowIds() throws Exception
   {
-    File path = new File(testMeta.dir);
-    FileUtils.deleteDirectory(path.getAbsoluteFile());
-    FileUtils.forceMkdir(new File(path.getAbsoluteFile(), "/localPath"));
-
-    AsyncFSStorageAgent sa = new AsyncFSStorageAgent(path.getPath(), null);
+    AsyncFSStorageAgent sa = new AsyncFSStorageAgent(testMeta.getPath(), null);
 
     long[] windowIds = new long[]{123L, 345L, 234L};
     for (long windowId : windowIds) {
@@ -506,13 +501,8 @@ public class StreamingContainerManagerTest
   @Test
   public void testProcessHeartbeat() throws Exception
   {
-    FileUtils.deleteDirectory(new File(testMeta.dir)); // clean any state from previous run
-
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
-
     TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
-     dag.setAttribute(o1, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
+    dag.setAttribute(o1, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
     dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
 
     StreamingContainerManager scm = new StreamingContainerManager(dag);
@@ -655,9 +645,6 @@ public class StreamingContainerManagerTest
   @Test
   public void testValidGenericOperatorDeployInfoType()
   {
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-
     GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
     TestGeneratorInputOperator.ValidGenericOperator o2 = dag.addOperator("o2", TestGeneratorInputOperator.ValidGenericOperator.class);
 
@@ -683,9 +670,6 @@ public class StreamingContainerManagerTest
   @Test
   public void testValidInputOperatorDeployInfoType()
   {
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-
     TestGeneratorInputOperator.ValidInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.ValidInputOperator.class);
     GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
 
@@ -711,8 +695,6 @@ public class StreamingContainerManagerTest
   @Test
   public void testOperatorShutdown()
   {
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
     dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
 
     GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
@@ -781,7 +763,6 @@ public class StreamingContainerManagerTest
 
   private void testDownStreamPartition(Locality locality) throws Exception
   {
-    LogicalPlan dag = new LogicalPlan();
     TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
     GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
     dag.setAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2));
@@ -815,8 +796,7 @@ public class StreamingContainerManagerTest
   @Test
   public void testPhysicalPropertyUpdate() throws Exception
   {
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null));
+    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.getPath(), null));
     TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
     GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
     dag.addStream("o1.outport", o1.outport, o2.inport1);
@@ -837,10 +817,9 @@ public class StreamingContainerManagerTest
     lc.shutdown();
   }
 
-  private LogicalPlan getTestAppDataSourceLogicalPlan(Class<? extends TestAppDataQueryOperator> qClass,
+  private void setupAppDataSourceLogicalPlan(Class<? extends TestAppDataQueryOperator> qClass,
           Class<? extends TestAppDataSourceOperator> dsClass, Class<? extends TestAppDataResultOperator> rClass)
   {
-    LogicalPlan dag = new LogicalPlan();
     TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
     TestAppDataQueryOperator q = dag.addOperator("q", qClass);
     TestAppDataResultOperator r = dag.addOperator("r", rClass);
@@ -854,13 +833,11 @@ public class StreamingContainerManagerTest
     dag.addStream("o1-to-ds", o1.outport, ds.inport1);
     dag.addStream("q-to-ds", q.outport, ds.query);
     dag.addStream("ds-to-r", ds.result, r.inport);
-
-    return dag;
   }
 
-  private void testAppDataSources(LogicalPlan dag, boolean appendQIDToTopic) throws Exception
+  private void testAppDataSources(boolean appendQIDToTopic) throws Exception
   {
-    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null));
+    dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.getPath(), null));
     StramLocalCluster lc = new StramLocalCluster(dag);
     lc.runAsync();
     StreamingContainerManager dnmgr = lc.dnmgr;
@@ -883,22 +860,22 @@ public class StreamingContainerManagerTest
   @Test
   public void testGetAppDataSources1() throws Exception
   {
-    LogicalPlan dag = getTestAppDataSourceLogicalPlan(TestAppDataQueryOperator.class, TestAppDataSourceOperator.class, TestAppDataResultOperator.ResultOperator1.class);
-    testAppDataSources(dag, true);
+    setupAppDataSourceLogicalPlan(TestAppDataQueryOperator.class, TestAppDataSourceOperator.class, TestAppDataResultOperator.ResultOperator1.class);
+    testAppDataSources(true);
   }
 
   @Test
   public void testGetAppDataSources2() throws Exception
   {
-    LogicalPlan dag = getTestAppDataSourceLogicalPlan(TestAppDataQueryOperator.class, TestAppDataSourceOperator.class, TestAppDataResultOperator.ResultOperator2.class);
-    testAppDataSources(dag, false);
+    setupAppDataSourceLogicalPlan(TestAppDataQueryOperator.class, TestAppDataSourceOperator.class, TestAppDataResultOperator.ResultOperator2.class);
+    testAppDataSources(false);
   }
 
   @Test
   public void testGetAppDataSources3() throws Exception
   {
-    LogicalPlan dag = getTestAppDataSourceLogicalPlan(TestAppDataQueryOperator.class, TestAppDataSourceOperator.class, TestAppDataResultOperator.ResultOperator3.class);
-    testAppDataSources(dag, false);
+    setupAppDataSourceLogicalPlan(TestAppDataQueryOperator.class, TestAppDataSourceOperator.class, TestAppDataResultOperator.ResultOperator3.class);
+    testAppDataSources(false);
   }
 
   @Test
@@ -933,8 +910,7 @@ public class StreamingContainerManagerTest
     try {
       server.start();
       int port = server.getPort();
-      LogicalPlan dag = new LogicalPlan();
-      dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null));
+      dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.getPath(), null));
       TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
       GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
       dag.addStream("o1.outport", o1.outport, o2.inport1);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java b/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java
index a1312a5..f6451c9 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CountDownLatch;
 
 import org.apache.hadoop.conf.Configuration;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
@@ -176,6 +177,14 @@ public class AutoMetricTest
 
   }
 
+  private LogicalPlan dag;
+
+  @Before
+  public void setup()
+  {
+    dag = StramTestSupport.createDAG(testMeta);
+  }
+
   /**
    * Verify custom stats generated by operator are propagated and trigger repartition.
    *
@@ -185,8 +194,7 @@ public class AutoMetricTest
   @SuppressWarnings("SleepWhileInLoop")
   public void testMetricPropagation() throws Exception
   {
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null));
+    dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.getPath(), null));
     dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
     dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
 
@@ -234,9 +242,7 @@ public class AutoMetricTest
     CountDownLatch latch = new CountDownLatch(1);
 
     LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(new Configuration());
-    LogicalPlan dag = new LogicalPlan();
 
-    dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
     TestGeneratorInputOperator inputOperator = dag.addOperator("input", TestGeneratorInputOperator.class);
 
     OperatorWithMetrics o1 = dag.addOperator("o1", OperatorWithMetrics.class);
@@ -264,8 +270,7 @@ public class AutoMetricTest
     CountDownLatch latch = new CountDownLatch(2);
 
     LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(new Configuration());
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
+
     TestGeneratorInputOperator inputOperator = dag.addOperator("input", TestGeneratorInputOperator.class);
 
     OperatorWithMetrics o1 = dag.addOperator("o1", OperatorWithMetrics.class);
@@ -289,8 +294,7 @@ public class AutoMetricTest
   public void testInjectionOfDefaultMetricsAggregator() throws Exception
   {
     LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(new Configuration());
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
+
     TestGeneratorInputOperator inputOperator = dag.addOperator("input", TestGeneratorInputOperator.class);
 
     OperatorWithMetricMethod o1 = dag.addOperator("o1", OperatorWithMetricMethod.class);
@@ -365,9 +369,7 @@ public class AutoMetricTest
     CountDownLatch latch = new CountDownLatch(1);
 
     LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(new Configuration());
-    LogicalPlan dag = new LogicalPlan();
 
-    dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
     TestGeneratorInputOperator inputOperator = dag.addOperator("input", TestGeneratorInputOperator.class);
 
     OperatorWithMetricMethod o1 = dag.addOperator("o1", OperatorWithMetricMethod.class);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
index cef671e..6df4e94 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
@@ -43,6 +43,7 @@ import com.datatorrent.bufferserver.packet.MessageType;
 import com.datatorrent.bufferserver.util.Codec;
 import com.datatorrent.stram.StramLocalCluster;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.support.StramTestSupport;
 import com.datatorrent.stram.support.StramTestSupport.TestMeta;
 import com.datatorrent.stram.tuple.EndWindowTuple;
 import com.datatorrent.stram.tuple.Tuple;
@@ -52,7 +53,11 @@ import com.datatorrent.stram.tuple.Tuple;
  */
 public class ProcessingModeTests
 {
-  @Rule public TestMeta testMeta = new TestMeta();
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  private LogicalPlan dag;
+
   ProcessingMode processingMode;
   int maxTuples = 30;
 
@@ -64,6 +69,7 @@ public class ProcessingModeTests
   @Before
   public void setup() throws IOException
   {
+    dag = StramTestSupport.createDAG(testMeta);
     StreamingContainer.eventloop.start();
   }
 
@@ -79,11 +85,10 @@ public class ProcessingModeTests
     CollectorOperator.collection.clear();
     CollectorOperator.duplicates.clear();
 
-    LogicalPlan dag = new LogicalPlan();
     dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
     dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
     dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
-    dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null));
+    dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.getPath(), null));
     RecoverableInputOperator rip = dag.addOperator("LongGenerator", RecoverableInputOperator.class);
     rip.setMaximumTuples(maxTuples);
     rip.setSimulateFailure(true);
@@ -105,8 +110,7 @@ public class ProcessingModeTests
     CollectorOperator.collection.clear();
     CollectorOperator.duplicates.clear();
 
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null));
+    dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.getPath(), null));
     dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
     dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
     dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
@@ -130,8 +134,7 @@ public class ProcessingModeTests
     CollectorOperator.collection.clear();
     CollectorOperator.duplicates.clear();
 
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null));
+    dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.getPath(), null));
     dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
     dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
     dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java b/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
index 1839c91..5dca8a4 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
@@ -31,6 +31,7 @@ import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -203,10 +204,17 @@ public class StreamPersistanceTests
   {
   }
 
+  private LogicalPlan dag;
+
+  @Before
+  public void setup()
+  {
+    dag = StramTestSupport.createDAG(testMeta);
+  }
+
   @Test
   public void testPersistStreamOperatorIsAdded()
   {
-    LogicalPlan dag = new LogicalPlan();
     TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
     GenericTestOperator x = dag.addOperator("x", new GenericTestOperator());
     TestRecieverOperator persister = new TestRecieverOperator();
@@ -222,7 +230,6 @@ public class StreamPersistanceTests
   @Test
   public void testPersistStreamOperatorIsAddedPerSink()
   {
-    LogicalPlan dag = new LogicalPlan();
     TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
     GenericTestOperator x1 = dag.addOperator("x1", new GenericTestOperator());
     GenericTestOperator x2 = dag.addOperator("x2", new GenericTestOperator());
@@ -255,7 +262,6 @@ public class StreamPersistanceTests
   public void testaddStreamThrowsExceptionOnInvalidLoggerType()
   {
     // Test Logger with non-optional output ports
-    LogicalPlan dag = new LogicalPlan();
     TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
     GenericTestOperator x = dag.addOperator("x", new GenericTestOperator());
     StreamMeta stream = dag.addStream("Stream1", input1.outport, x.inport1);
@@ -302,7 +308,6 @@ public class StreamPersistanceTests
   public void testaddStreamThrowsExceptionOnInvalidInputPortForLoggerType()
   {
     // Test for input port belonging to different object
-    LogicalPlan dag = new LogicalPlan();
     TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
     GenericTestOperator x = dag.addOperator("x", new GenericTestOperator());
     TestRecieverOperator persister = new TestRecieverOperator();
@@ -322,7 +327,6 @@ public class StreamPersistanceTests
   public void testPersistStreamOperatorIsRemovedWhenStreamIsRemoved()
   {
     // Remove Stream and check if persist operator is removed
-    LogicalPlan dag = new LogicalPlan();
     TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
     GenericTestOperator x = dag.addOperator("x", new GenericTestOperator());
     TestRecieverOperator persister = new TestRecieverOperator();
@@ -340,7 +344,6 @@ public class StreamPersistanceTests
   public void testPersistStreamOperatorIsRemovedWhenSinkIsRemoved()
   {
     // Remove sink and check if corresponding persist operator is removed
-    LogicalPlan dag = new LogicalPlan();
     TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
     GenericTestOperator x1 = dag.addOperator("x1", new GenericTestOperator());
     GenericTestOperator x2 = dag.addOperator("x2", new GenericTestOperator());
@@ -383,7 +386,6 @@ public class StreamPersistanceTests
   @Test
   public void testPersistStreamOperatorIsRemovedWhenAllSinksAreRemoved()
   {
-    LogicalPlan dag = new LogicalPlan();
     TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
     GenericTestOperator x1 = dag.addOperator("x1", new GenericTestOperator());
     GenericTestOperator x2 = dag.addOperator("x2", new GenericTestOperator());
@@ -409,7 +411,6 @@ public class StreamPersistanceTests
   @Test
   public void testPersistStreamOperatorGeneratesIdenticalOutputAsSink() throws ClassNotFoundException, IOException, InterruptedException
   {
-    LogicalPlan dag = new LogicalPlan();
     AscendingNumbersOperator input1 = dag.addOperator("input1", AscendingNumbersOperator.class);
     // Add PersistOperator directly to dag
     final TestRecieverOperator x = dag.addOperator("x", new TestRecieverOperator());
@@ -603,7 +604,6 @@ public class StreamPersistanceTests
   @Test
   public void testPersistStreamWithFiltering() throws ClassNotFoundException, IOException, InterruptedException
   {
-    LogicalPlan dag = new LogicalPlan();
     AscendingNumbersOperator ascend = dag.addOperator("ascend", new AscendingNumbersOperator());
     PassThruOperatorWithCodec passThru = dag.addOperator("PassThrough", new PassThruOperatorWithCodec(2));
     TestRecieverOperator console = dag.addOperator("console", new TestRecieverOperator());
@@ -617,7 +617,6 @@ public class StreamPersistanceTests
   @Test
   public void testPersistStreamOnSingleSinkWithFiltering() throws ClassNotFoundException, IOException, InterruptedException
   {
-    LogicalPlan dag = new LogicalPlan();
     AscendingNumbersOperator ascend = dag.addOperator("ascend", new AscendingNumbersOperator());
     PassThruOperatorWithCodec passThru = dag.addOperator("PassThrough", new PassThruOperatorWithCodec(2));
     final TestRecieverOperator console = dag.addOperator("console", new TestRecieverOperator());
@@ -632,7 +631,6 @@ public class StreamPersistanceTests
   @Test
   public void testPersistStreamOnSingleSinkWithFilteringContainerLocal() throws ClassNotFoundException, IOException, InterruptedException
   {
-    LogicalPlan dag = new LogicalPlan();
     AscendingNumbersOperator ascend = dag.addOperator("ascend", new AscendingNumbersOperator());
     PassThruOperatorWithCodec passThru = dag.addOperator("PassThrough", new PassThruOperatorWithCodec(2));
     PassThruOperatorWithCodec passThru2 = dag.addOperator("Multiples_of_3", new PassThruOperatorWithCodec(3));
@@ -696,7 +694,6 @@ public class StreamPersistanceTests
   @Test
   public void testPersistStreamOperatorGeneratesUnionOfAllSinksOutput() throws ClassNotFoundException, IOException
   {
-    LogicalPlan dag = new LogicalPlan();
     AscendingNumbersOperator ascend = dag.addOperator("ascend", new AscendingNumbersOperator());
     PassThruOperatorWithCodec passThru1 = dag.addOperator("PassThrough1", new PassThruOperatorWithCodec(2));
     PassThruOperatorWithCodec passThru2 = dag.addOperator("PassThrough2", new PassThruOperatorWithCodec(3));
@@ -829,7 +826,6 @@ public class StreamPersistanceTests
   @Test
   public void testPersistStreamOperatorMultiplePhysicalOperatorsForSink() throws ClassNotFoundException, IOException
   {
-    LogicalPlan dag = new LogicalPlan();
     AscendingNumbersOperator ascend = dag.addOperator("ascend", new AscendingNumbersOperator());
     PartitionedTestOperatorWithFiltering passThru = dag.addOperator("partition", new PartitionedTestOperatorWithFiltering());
     final TestRecieverOperator console = dag.addOperator("console", new TestRecieverOperator());
@@ -883,7 +879,6 @@ public class StreamPersistanceTests
   @Test
   public void testPartitionedPersistOperator() throws ClassNotFoundException, IOException
   {
-    LogicalPlan dag = new LogicalPlan();
     AscendingNumbersOperator ascend = dag.addOperator("ascend", new AscendingNumbersOperator());
     PartitionedTestOperatorWithFiltering passThru = dag.addOperator("partition", new PartitionedTestOperatorWithFiltering());
     final TestRecieverOperator console = dag.addOperator("console", new TestRecieverOperator());
@@ -942,10 +937,6 @@ public class StreamPersistanceTests
   @Test
   public void testDynamicPartitioning() throws ClassNotFoundException, IOException
   {
-    LogicalPlan dag = new LogicalPlan();
-
-    dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-
     AscendingNumbersOperator ascend = dag.addOperator("ascend", new AscendingNumbersOperator());
 
     final TestRecieverOperator console = dag.addOperator("console", new TestRecieverOperator());

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java b/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java
index 314fdfc..cf2a887 100644
--- a/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java
+++ b/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java
@@ -22,6 +22,10 @@ import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.lang.reflect.Field;
+import java.net.URI;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -50,6 +54,7 @@ import com.datatorrent.stram.api.AppDataSource;
 import com.datatorrent.stram.api.BaseContext;
 import com.datatorrent.stram.engine.OperatorContext;
 import com.datatorrent.stram.engine.WindowGenerator;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
 import com.datatorrent.stram.plan.physical.PTOperator;
 import com.datatorrent.stram.tuple.EndWindowTuple;
 import com.datatorrent.stram.tuple.Tuple;
@@ -253,23 +258,54 @@ abstract public class StramTestSupport
 
   public static class TestMeta extends TestWatcher
   {
-    public String dir = null;
+    private File dir;
 
     @Override
     protected void starting(org.junit.runner.Description description)
     {
-      String methodName = description.getMethodName();
-      String className = description.getClassName();
-      //className = className.substring(className.lastIndexOf('.') + 1);
-      this.dir = "target/" + className + "/" + methodName;
-      new File(this.dir).mkdirs();
+      final String methodName = description.getMethodName();
+      final String className = description.getClassName();
+      dir = new File("target/" + className + "/" + methodName);
+      try {
+        Files.createDirectories(dir.toPath());
+      } catch (FileAlreadyExistsException e) {
+        try {
+          Files.delete(dir.toPath());
+          Files.createDirectories(dir.toPath());
+        } catch (IOException ioe) {
+          throw new RuntimeException("Fail to create test working directory " + dir.getAbsolutePath(), e);
+        }
+      } catch (IOException e) {
+        throw new RuntimeException("Fail to create test working directory " + dir.getAbsolutePath(), e);
+      }
     }
 
     @Override
     protected void finished(org.junit.runner.Description description)
     {
-      FileUtils.deleteQuietly(new File(this.dir));
+      FileUtils.deleteQuietly(dir);
+    }
+
+    public String getPath()
+    {
+      return dir.getPath();
+    }
+
+    public String getAbsolutePath()
+    {
+      return dir.getAbsolutePath();
+    }
+
+    public Path toPath()
+    {
+      return dir.toPath();
+    }
+
+    public URI toURI()
+    {
+      return dir.toURI();
     }
+
   }
 
   public static class TestHomeDirectory extends TestWatcher
@@ -333,6 +369,20 @@ abstract public class StramTestSupport
     }
   }
 
+  public static LogicalPlan createDAG(final TestMeta testMeta, final String suffix)
+  {
+    if (suffix == null) {
+      throw new NullPointerException();
+    }
+    LogicalPlan dag = new LogicalPlan();
+    dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.getPath() + suffix);
+    return dag;
+  }
+
+  public static LogicalPlan createDAG(final TestMeta testMeta)
+  {
+    return createDAG(testMeta, "");
+  }
 
   public static class MemoryStorageAgent implements StorageAgent, Serializable
   {