You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2015/10/25 20:37:45 UTC
[02/36] incubator-apex-core git commit: APEX-162 #resolve Enhance
StramTestSupport.TestMeta API.
APEX-162 #resolve Enhance StramTestSupport.TestMeta API.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/3c35cccb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/3c35cccb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/3c35cccb
Branch: refs/heads/feature-module
Commit: 3c35cccbd574d2a28e543b644deeb9f7c8a886e5
Parents: 809e6f6
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Mon Oct 12 16:39:32 2015 -0700
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Mon Oct 12 16:39:32 2015 -0700
----------------------------------------------------------------------
engine/pom.xml | 2 +-
.../com/datatorrent/stram/CheckpointTest.java | 55 ++++--------
.../stram/LogicalPlanModificationTest.java | 34 ++++----
.../datatorrent/stram/OutputUnifiedTest.java | 29 ++++---
.../stram/StramLocalClusterTest.java | 17 ++--
.../datatorrent/stram/StramMiniClusterTest.java | 6 +-
.../datatorrent/stram/StramRecoveryTest.java | 48 +++++------
.../com/datatorrent/stram/StreamCodecTest.java | 51 ++---------
.../stram/StreamingContainerManagerTest.java | 90 +++++++-------------
.../stram/engine/AutoMetricTest.java | 22 ++---
.../stram/engine/ProcessingModeTests.java | 17 ++--
.../stram/plan/StreamPersistanceTests.java | 27 ++----
.../stram/support/StramTestSupport.java | 64 ++++++++++++--
13 files changed, 212 insertions(+), 250 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/pom.xml
----------------------------------------------------------------------
diff --git a/engine/pom.xml b/engine/pom.xml
index 7974313..2165500 100644
--- a/engine/pom.xml
+++ b/engine/pom.xml
@@ -145,7 +145,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<configuration>
- <maxAllowedViolations>2322</maxAllowedViolations>
+ <maxAllowedViolations>2320</maxAllowedViolations>
</configuration>
</plugin>
</plugins>
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
index ae28ebd..5d11b86 100644
--- a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
@@ -19,8 +19,7 @@
package com.datatorrent.stram;
import com.datatorrent.common.util.BaseOperator;
-import java.io.File;
-import java.io.IOException;
+
import java.util.*;
import com.google.common.collect.Maps;
@@ -30,8 +29,6 @@ import org.junit.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
@@ -51,6 +48,7 @@ import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.plan.physical.PhysicalPlan;
+import com.datatorrent.stram.support.StramTestSupport;
import com.datatorrent.stram.support.StramTestSupport.MemoryStorageAgent;
import com.datatorrent.stram.support.StramTestSupport.TestMeta;
@@ -61,30 +59,9 @@ public class CheckpointTest
{
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(CheckpointTest.class);
- @Rule public TestMeta testMeta = new TestMeta();
-
- /**
- *
- * @throws IOException
- */
- @Before
- public void setupEachTest() throws IOException
- {
- try {
- FileContext.getLocalFSFileContext().delete(
- new Path(new File(testMeta.dir).getAbsolutePath()), true);
- }
- catch (Exception e) {
- throw new RuntimeException("could not cleanup test dir", e);
- }
- //StramChild.eventloop.start();
- }
- @After
- public void teardown()
- {
- //StramChild.eventloop.stop();
- }
+ @Rule
+ public TestMeta testMeta = new TestMeta();
private static class MockInputOperator extends BaseOperator implements InputOperator
{
@@ -106,6 +83,14 @@ public class CheckpointTest
}
}
+ private LogicalPlan dag;
+
+ @Before
+ public void setup()
+ {
+ dag = StramTestSupport.createDAG(testMeta);
+ }
+
/**
* Test saving of operator state at window boundary.
* @throws Exception
@@ -113,9 +98,7 @@ public class CheckpointTest
@Test
public void testBackup() throws Exception
{
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
- AsyncFSStorageAgent storageAgent = new AsyncFSStorageAgent(testMeta.dir, null);
+ AsyncFSStorageAgent storageAgent = new AsyncFSStorageAgent(testMeta.getPath(), null);
storageAgent.setSyncCheckpoint(true);
dag.setAttribute(OperatorContext.STORAGE_AGENT, storageAgent);
dag.setAttribute(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 1);
@@ -172,8 +155,7 @@ public class CheckpointTest
public void testUpdateRecoveryCheckpoint() throws Exception
{
Clock clock = new SystemClock();
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
+
dag.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
@@ -273,8 +255,7 @@ public class CheckpointTest
public void testUpdateCheckpointsRecovery()
{
MockClock clock = new MockClock();
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
+
dag.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
dag.setAttribute(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 1);
@@ -338,8 +319,7 @@ public class CheckpointTest
public void testUpdateCheckpointsProcessingTimeout()
{
MockClock clock = new MockClock();
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
+
dag.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
@@ -419,8 +399,7 @@ public class CheckpointTest
public void testBlockedOperatorContainerRestart()
{
MockClock clock = new MockClock();
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
+
dag.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java b/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java
index 847f3fd..8a50124 100644
--- a/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java
@@ -26,6 +26,7 @@ import java.util.concurrent.FutureTask;
import javax.validation.ValidationException;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -46,18 +47,26 @@ import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.plan.physical.PhysicalPlan;
import com.datatorrent.stram.plan.physical.PlanModifier;
+import com.datatorrent.stram.support.StramTestSupport;
import com.datatorrent.stram.support.StramTestSupport.TestMeta;
import com.google.common.collect.Sets;
public class LogicalPlanModificationTest
{
- @Rule public TestMeta testMeta = new TestMeta();
+ private LogicalPlan dag;
+
+ @Rule
+ public TestMeta testMeta = new TestMeta();
+
+ @Before
+ public void setup()
+ {
+ dag = StramTestSupport.createDAG(testMeta);
+ }
@Test
public void testAddOperator()
{
- LogicalPlan dag = new LogicalPlan();
-
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
@@ -94,7 +103,6 @@ public class LogicalPlanModificationTest
@Test
public void testSetOperatorProperty()
{
- LogicalPlan dag = new LogicalPlan();
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
OperatorMeta o1Meta = dag.getMeta(o1);
@@ -121,8 +129,6 @@ public class LogicalPlanModificationTest
@Test
public void testRemoveOperator()
{
- LogicalPlan dag = new LogicalPlan();
-
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
OperatorMeta o1Meta = dag.getMeta(o1);
GenericTestOperator o12 = dag.addOperator("o12", GenericTestOperator.class);
@@ -191,8 +197,6 @@ public class LogicalPlanModificationTest
@Test
public void testRemoveOperator2()
{
- LogicalPlan dag = new LogicalPlan();
-
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
OperatorMeta o1Meta = dag.getMeta(o1);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
@@ -233,8 +237,6 @@ public class LogicalPlanModificationTest
@Test
public void testRemoveStream()
{
- LogicalPlan dag = new LogicalPlan();
-
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
@@ -255,8 +257,6 @@ public class LogicalPlanModificationTest
@Test
public void testAddStream()
{
- LogicalPlan dag = new LogicalPlan();
-
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
@@ -295,10 +295,8 @@ public class LogicalPlanModificationTest
}
- private void testExecutionManager(StorageAgent agent) throws Exception {
-
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
+ private void testExecutionManager(StorageAgent agent) throws Exception
+ {
dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
StreamingContainerManager dnm = new StreamingContainerManager(dag);
@@ -337,13 +335,13 @@ public class LogicalPlanModificationTest
@Test
public void testExecutionManagerWithSyncStorageAgent() throws Exception
{
- testExecutionManager(new FSStorageAgent(testMeta.dir, null));
+ testExecutionManager(new FSStorageAgent(testMeta.getPath(), null));
}
@Test
public void testExecutionManagerWithAsyncStorageAgent() throws Exception
{
- testExecutionManager(new AsyncFSStorageAgent(testMeta.dir, null));
+ testExecutionManager(new AsyncFSStorageAgent(testMeta.getPath(), null));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/src/test/java/com/datatorrent/stram/OutputUnifiedTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/OutputUnifiedTest.java b/engine/src/test/java/com/datatorrent/stram/OutputUnifiedTest.java
index f0461ba..7581cc3 100644
--- a/engine/src/test/java/com/datatorrent/stram/OutputUnifiedTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/OutputUnifiedTest.java
@@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
import java.util.List;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -50,12 +51,18 @@ public class OutputUnifiedTest
@Rule
public StramTestSupport.TestMeta testMeta = new StramTestSupport.TestMeta();
- @Test
- public void testManyToOnePartition() throws Exception {
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
+ private LogicalPlan dag;
+
+ @Before
+ public void setup()
+ {
+ dag = StramTestSupport.createDAG(testMeta);
dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
+ }
+ @Test
+ public void testManyToOnePartition() throws Exception
+ {
TestInputOperator i1 = new TestInputOperator();
dag.addOperator("i1", i1);
@@ -83,11 +90,8 @@ public class OutputUnifiedTest
}
@Test
- public void testMxNPartition() throws Exception {
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
- dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
-
+ public void testMxNPartition() throws Exception
+ {
TestInputOperator i1 = new TestInputOperator();
dag.addOperator("i1", i1);
@@ -117,11 +121,8 @@ public class OutputUnifiedTest
}
@Test
- public void testParallelPartition() throws Exception {
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
- dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
-
+ public void testParallelPartition() throws Exception
+ {
TestInputOperator i1 = new TestInputOperator();
dag.addOperator("i1", i1);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java b/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
index c784fd1..aaf92b8 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
@@ -29,8 +29,6 @@ import org.junit.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.datatorrent.api.Context;
-
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.stram.StramLocalCluster.LocalStreamingContainer;
import com.datatorrent.stram.StramLocalCluster.MockComponentFactory;
@@ -48,11 +46,12 @@ public class StramLocalClusterTest
@Rule
public StramTestSupport.TestMeta testMeta = new StramTestSupport.TestMeta();
+ private LogicalPlan dag;
+
@Before
public void setup() throws IOException
{
-// StramChild.eventloop = new DefaultEventLoop("StramLocalClusterTestEventLoop");
-// StramChild.eventloop.start();
+ dag = StramTestSupport.createDAG(testMeta);
}
@After
@@ -70,9 +69,7 @@ public class StramLocalClusterTest
@Test
public void testLocalClusterInitShutdown() throws Exception
{
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
- dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null));
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.getPath(), null));
TestGeneratorInputOperator genNode = dag.addOperator("genNode", TestGeneratorInputOperator.class);
genNode.setMaxTuples(2);
@@ -110,11 +107,9 @@ public class StramLocalClusterTest
@SuppressWarnings("SleepWhileInLoop")
public void testRecovery() throws Exception
{
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
- AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.dir, null);
+ AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.getPath(), null);
agent.setSyncCheckpoint(true);
- dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, agent);
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
TestGeneratorInputOperator node1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
// data will be added externally from test
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java b/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
index f0fd325..d5cb14f 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
@@ -227,8 +227,8 @@ public class StramMiniClusterTest
private LogicalPlan createDAG(LogicalPlanConfiguration lpc) throws Exception
{
LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(LogicalPlan.APPLICATION_PATH, new File(testMeta.dir).toURI().toString());
- lpc.prepareDAG(dag,null,"testApp");
+ dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.toURI().toString());
+ lpc.prepareDAG(dag, null, "testApp");
dag.validate();
Assert.assertEquals("", Integer.valueOf(128), dag.getValue(DAG.MASTER_MEMORY_MB));
Assert.assertEquals("", "-Dlog4j.properties=custom_log4j.properties", dag.getValue(DAG.CONTAINER_JVM_OPTIONS));
@@ -360,7 +360,7 @@ public class StramMiniClusterTest
{
LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(LogicalPlan.APPLICATION_PATH, new File(testMeta.dir).toURI().toString());
+ dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.toURI().toString());
FailingOperator badOperator = dag.addOperator("badOperator", FailingOperator.class);
dag.getContextAttributes(badOperator).put(OperatorContext.RECOVERY_ATTEMPTS, 1);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
index 89ae3e7..6dbdcf0 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.test.MockitoUtil;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;
@@ -65,6 +66,7 @@ import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.plan.physical.PhysicalPlan;
import com.datatorrent.stram.plan.physical.PhysicalPlanTest.PartitioningTestOperator;
+import com.datatorrent.stram.support.StramTestSupport;
import com.datatorrent.stram.support.StramTestSupport.TestMeta;
import static org.junit.Assert.assertEquals;
@@ -72,12 +74,20 @@ import static org.junit.Assert.assertEquals;
public class StramRecoveryTest
{
private static final Logger LOG = LoggerFactory.getLogger(StramRecoveryTest.class);
- @Rule public final TestMeta testMeta = new TestMeta();
- private void testPhysicalPlanSerialization(StorageAgent agent) throws Exception
+ @Rule
+ public final TestMeta testMeta = new TestMeta();
+
+ private LogicalPlan dag;
+
+ @Before
+ public void setup()
{
- LogicalPlan dag = new LogicalPlan();
+ dag = StramTestSupport.createDAG(testMeta);
+ }
+ private void testPhysicalPlanSerialization(StorageAgent agent) throws Exception
+ {
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
PartitioningTestOperator o2 = dag.addOperator("o2", PartitioningTestOperator.class);
o2.setPartitionCount(3);
@@ -127,13 +137,13 @@ public class StramRecoveryTest
@Test
public void testPhysicalPlanSerializationWithSyncAgent() throws Exception
{
- testPhysicalPlanSerialization(new FSStorageAgent(testMeta.dir, null));
+ testPhysicalPlanSerialization(new FSStorageAgent(testMeta.getPath(), null));
}
@Test
public void testPhysicalPlanSerializationWithAsyncAgent() throws Exception
{
- testPhysicalPlanSerialization(new AsyncFSStorageAgent(testMeta.dir, null));
+ testPhysicalPlanSerialization(new AsyncFSStorageAgent(testMeta.getPath(), null));
}
public static class StatsListeningOperator extends TestGeneratorInputOperator implements StatsListener
@@ -161,10 +171,6 @@ public class StramRecoveryTest
*/
private void testContainerManager(StorageAgent agent) throws Exception
{
- FileUtils.deleteDirectory(new File(testMeta.dir)); // clean any state from previous run
-
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
StatsListeningOperator o1 = dag.addOperator("o1", StatsListeningOperator.class);
@@ -188,8 +194,7 @@ public class StramRecoveryTest
assertEquals("state " + o1p1, PTOperator.State.PENDING_DEPLOY, o1p1.getState());
// test restore initial snapshot + log
- dag = new LogicalPlan();
- dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
+ dag = StramTestSupport.createDAG(testMeta);
scm = StreamingContainerManager.getInstance(new FSRecoveryHandler(dag.assertAppPath(), new Configuration(false)), dag, false);
dag = scm.getLogicalPlan();
plan = scm.getPhysicalPlan();
@@ -245,8 +250,7 @@ public class StramRecoveryTest
checkpoint(scm, o1p1, offlineCheckpoint);
// test restore
- dag = new LogicalPlan();
- dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
+ dag = StramTestSupport.createDAG(testMeta);
scm = StreamingContainerManager.getInstance(new FSRecoveryHandler(dag.assertAppPath(), new Configuration(false)), dag, false);
Assert.assertNotSame("dag references", dag, scm.getLogicalPlan());
@@ -270,13 +274,13 @@ public class StramRecoveryTest
@Test
public void testContainerManagerWithSyncAgent() throws Exception
{
- testPhysicalPlanSerialization(new FSStorageAgent(testMeta.dir, null));
+ testPhysicalPlanSerialization(new FSStorageAgent(testMeta.getPath(), null));
}
@Test
public void testContainerManagerWithAsyncAgent() throws Exception
{
- testPhysicalPlanSerialization(new AsyncFSStorageAgent(testMeta.dir, null));
+ testPhysicalPlanSerialization(new AsyncFSStorageAgent(testMeta.getPath(), null));
}
@Test
@@ -284,9 +288,7 @@ public class StramRecoveryTest
{
final MutableInt flushCount = new MutableInt();
final MutableBoolean isClosed = new MutableBoolean(false);
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
- dag.setAttribute(OperatorContext.STORAGE_AGENT, new FSStorageAgent(testMeta.dir, null));
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, new FSStorageAgent(testMeta.getPath(), null));
TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
StreamingContainerManager scm = new StreamingContainerManager(dag);
@@ -386,12 +388,10 @@ public class StramRecoveryTest
private void testRestartApp(StorageAgent agent, String appPath1) throws Exception
{
- FileUtils.deleteDirectory(new File(testMeta.dir)); // clean any state from previous run
String appId1 = "app1";
String appId2 = "app2";
- String appPath2 = testMeta.dir + "/" + appId2;
+ String appPath2 = testMeta.getPath() + "/" + appId2;
- LogicalPlan dag = new LogicalPlan();
dag.setAttribute(LogicalPlan.APPLICATION_ID, appId1);
dag.setAttribute(LogicalPlan.APPLICATION_PATH, appPath1);
dag.setAttribute(LogicalPlan.APPLICATION_ATTEMPT_ID, 1);
@@ -445,21 +445,21 @@ public class StramRecoveryTest
@Test
public void testRestartAppWithSyncAgent() throws Exception
{
- String appPath1 = testMeta.dir + "/app1";
+ final String appPath1 = testMeta.getPath() + "/app1";
testRestartApp(new FSStorageAgent(appPath1 + "/" + LogicalPlan.SUBDIR_CHECKPOINTS, null), appPath1);
}
@Test
public void testRestartAppWithAsyncAgent() throws Exception
{
- String appPath1 = testMeta.dir + "/app1";
+ final String appPath1 = testMeta.getPath() + "/app1";
testRestartApp(new AsyncFSStorageAgent(appPath1 + "/" + LogicalPlan.SUBDIR_CHECKPOINTS, null), appPath1);
}
@Test
public void testRpcFailover() throws Exception
{
- String appPath = testMeta.dir;
+ String appPath = testMeta.getPath();
Configuration conf = new Configuration(false);
final AtomicBoolean timedout = new AtomicBoolean();
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
index 6bfa591..ddf3448 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
@@ -34,6 +34,7 @@ import com.google.common.collect.Lists;
import java.io.Serializable;
import java.util.*;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -41,15 +42,20 @@ import org.junit.Test;
*/
public class StreamCodecTest
{
+ private LogicalPlan dag;
+
@Rule
public StramTestSupport.TestMeta testMeta = new StramTestSupport.TestMeta();
+ @Before
+ public void setup()
+ {
+ dag = StramTestSupport.createDAG(testMeta);
+ }
+
@Test
public void testStreamCodec()
{
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-
GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class);
@@ -107,9 +113,6 @@ public class StreamCodecTest
@Test
public void testStreamCodecReuse()
{
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-
GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class);
@@ -154,9 +157,6 @@ public class StreamCodecTest
@Test
public void testDefaultStreamCodec()
{
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-
GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
DefaultCodecOperator node2 = dag.addOperator("node2", DefaultCodecOperator.class);
DefaultCodecOperator node3 = dag.addOperator("node3", DefaultCodecOperator.class);
@@ -213,9 +213,6 @@ public class StreamCodecTest
@Test
public void testPartitioningStreamCodec()
{
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-
GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
dag.setAttribute(node2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3));
@@ -264,9 +261,6 @@ public class StreamCodecTest
@Test
public void testMxNPartitioningStreamCodec()
{
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-
GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
dag.setAttribute(node1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2));
GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
@@ -331,9 +325,6 @@ public class StreamCodecTest
@Test
public void testParallelPartitioningStreamCodec()
{
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-
GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
dag.setAttribute(node1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2));
GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
@@ -422,9 +413,6 @@ public class StreamCodecTest
@Test
public void testMultipleInputStreamCodec()
{
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-
GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
TestStreamCodec serDe = new TestStreamCodec();
GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
@@ -477,9 +465,6 @@ public class StreamCodecTest
@Test
public void testPartitioningMultipleInputStreamCodec()
{
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-
GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
dag.setAttribute(node1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2));
@@ -556,9 +541,6 @@ public class StreamCodecTest
@Test
public void testMultipleStreamCodecs()
{
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-
GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class);
@@ -613,9 +595,6 @@ public class StreamCodecTest
@Test
public void testPartitioningMultipleStreamCodecs()
{
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-
GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class);
@@ -699,9 +678,6 @@ public class StreamCodecTest
@Test
public void testMxNMultipleStreamCodecs()
{
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-
GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
dag.setAttribute(node1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2));
GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
@@ -803,9 +779,6 @@ public class StreamCodecTest
@Test
public void testInlineStreamCodec()
{
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-
GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class);
@@ -887,9 +860,6 @@ public class StreamCodecTest
@Test
public void testCascadingStreamCodec()
{
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-
GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class);
@@ -974,9 +944,6 @@ public class StreamCodecTest
@Test
public void testDynamicPartitioningStreamCodec()
{
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-
GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
dag.setAttribute(node1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2));
dag.setAttribute(node1, Context.OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener) new PartitioningTest.PartitionLoadWatch()));
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
index 710440d..b257632 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
@@ -29,6 +29,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputByteBuffer;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -83,10 +84,20 @@ import org.eclipse.jetty.websocket.WebSocket;
public class StreamingContainerManagerTest
{
- @Rule public TestMeta testMeta = new TestMeta();
+ @Rule
+ public TestMeta testMeta = new TestMeta();
+
+ private LogicalPlan dag;
+
+ @Before
+ public void setup()
+ {
+ dag = StramTestSupport.createDAG(testMeta);
+ }
@Test
- public void testDeployInfoSerialization() throws Exception {
+ public void testDeployInfoSerialization() throws Exception
+ {
OperatorDeployInfo ndi = new OperatorDeployInfo();
ndi.name = "node1";
ndi.type = OperatorDeployInfo.OperatorType.GENERIC;
@@ -136,11 +147,8 @@ public class StreamingContainerManagerTest
}
@Test
- public void testGenerateDeployInfo() {
-
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-
+ public void testGenerateDeployInfo()
+ {
TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
@@ -238,9 +246,8 @@ public class StreamingContainerManagerTest
}
@Test
- public void testStaticPartitioning() {
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
+ public void testStaticPartitioning()
+ {
//
// ,---> node2----,
// | |
@@ -355,9 +362,6 @@ public class StreamingContainerManagerTest
@Test
public void testRecoveryOrder() throws Exception
{
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
-
GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class);
@@ -406,9 +410,6 @@ public class StreamingContainerManagerTest
@Test
public void testRecoveryUpstreamInline() throws Exception
{
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
-
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
@@ -444,11 +445,9 @@ public class StreamingContainerManagerTest
}
@Test
- public void testCheckpointWindowIds() throws Exception {
- File path = new File(testMeta.dir);
- FileUtils.deleteDirectory(path.getAbsoluteFile());
-
- FSStorageAgent sa = new FSStorageAgent(path.getPath(), null);
+ public void testCheckpointWindowIds() throws Exception
+ {
+ FSStorageAgent sa = new FSStorageAgent(testMeta.getPath(), null);
long[] windowIds = new long[]{123L, 345L, 234L};
for (long windowId : windowIds) {
@@ -475,11 +474,7 @@ public class StreamingContainerManagerTest
@Test
public void testAsyncCheckpointWindowIds() throws Exception
{
- File path = new File(testMeta.dir);
- FileUtils.deleteDirectory(path.getAbsoluteFile());
- FileUtils.forceMkdir(new File(path.getAbsoluteFile(), "/localPath"));
-
- AsyncFSStorageAgent sa = new AsyncFSStorageAgent(path.getPath(), null);
+ AsyncFSStorageAgent sa = new AsyncFSStorageAgent(testMeta.getPath(), null);
long[] windowIds = new long[]{123L, 345L, 234L};
for (long windowId : windowIds) {
@@ -506,13 +501,8 @@ public class StreamingContainerManagerTest
@Test
public void testProcessHeartbeat() throws Exception
{
- FileUtils.deleteDirectory(new File(testMeta.dir)); // clean any state from previous run
-
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
-
TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
- dag.setAttribute(o1, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
+ dag.setAttribute(o1, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
StreamingContainerManager scm = new StreamingContainerManager(dag);
@@ -655,9 +645,6 @@ public class StreamingContainerManagerTest
@Test
public void testValidGenericOperatorDeployInfoType()
{
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
TestGeneratorInputOperator.ValidGenericOperator o2 = dag.addOperator("o2", TestGeneratorInputOperator.ValidGenericOperator.class);
@@ -683,9 +670,6 @@ public class StreamingContainerManagerTest
@Test
public void testValidInputOperatorDeployInfoType()
{
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-
TestGeneratorInputOperator.ValidInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.ValidInputOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
@@ -711,8 +695,6 @@ public class StreamingContainerManagerTest
@Test
public void testOperatorShutdown()
{
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
@@ -781,7 +763,6 @@ public class StreamingContainerManagerTest
private void testDownStreamPartition(Locality locality) throws Exception
{
- LogicalPlan dag = new LogicalPlan();
TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
dag.setAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2));
@@ -815,8 +796,7 @@ public class StreamingContainerManagerTest
@Test
public void testPhysicalPropertyUpdate() throws Exception
{
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null));
+ dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.getPath(), null));
TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
dag.addStream("o1.outport", o1.outport, o2.inport1);
@@ -837,10 +817,9 @@ public class StreamingContainerManagerTest
lc.shutdown();
}
- private LogicalPlan getTestAppDataSourceLogicalPlan(Class<? extends TestAppDataQueryOperator> qClass,
+ private void setupAppDataSourceLogicalPlan(Class<? extends TestAppDataQueryOperator> qClass,
Class<? extends TestAppDataSourceOperator> dsClass, Class<? extends TestAppDataResultOperator> rClass)
{
- LogicalPlan dag = new LogicalPlan();
TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
TestAppDataQueryOperator q = dag.addOperator("q", qClass);
TestAppDataResultOperator r = dag.addOperator("r", rClass);
@@ -854,13 +833,11 @@ public class StreamingContainerManagerTest
dag.addStream("o1-to-ds", o1.outport, ds.inport1);
dag.addStream("q-to-ds", q.outport, ds.query);
dag.addStream("ds-to-r", ds.result, r.inport);
-
- return dag;
}
- private void testAppDataSources(LogicalPlan dag, boolean appendQIDToTopic) throws Exception
+ private void testAppDataSources(boolean appendQIDToTopic) throws Exception
{
- dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null));
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.getPath(), null));
StramLocalCluster lc = new StramLocalCluster(dag);
lc.runAsync();
StreamingContainerManager dnmgr = lc.dnmgr;
@@ -883,22 +860,22 @@ public class StreamingContainerManagerTest
@Test
public void testGetAppDataSources1() throws Exception
{
- LogicalPlan dag = getTestAppDataSourceLogicalPlan(TestAppDataQueryOperator.class, TestAppDataSourceOperator.class, TestAppDataResultOperator.ResultOperator1.class);
- testAppDataSources(dag, true);
+ setupAppDataSourceLogicalPlan(TestAppDataQueryOperator.class, TestAppDataSourceOperator.class, TestAppDataResultOperator.ResultOperator1.class);
+ testAppDataSources(true);
}
@Test
public void testGetAppDataSources2() throws Exception
{
- LogicalPlan dag = getTestAppDataSourceLogicalPlan(TestAppDataQueryOperator.class, TestAppDataSourceOperator.class, TestAppDataResultOperator.ResultOperator2.class);
- testAppDataSources(dag, false);
+ setupAppDataSourceLogicalPlan(TestAppDataQueryOperator.class, TestAppDataSourceOperator.class, TestAppDataResultOperator.ResultOperator2.class);
+ testAppDataSources(false);
}
@Test
public void testGetAppDataSources3() throws Exception
{
- LogicalPlan dag = getTestAppDataSourceLogicalPlan(TestAppDataQueryOperator.class, TestAppDataSourceOperator.class, TestAppDataResultOperator.ResultOperator3.class);
- testAppDataSources(dag, false);
+ setupAppDataSourceLogicalPlan(TestAppDataQueryOperator.class, TestAppDataSourceOperator.class, TestAppDataResultOperator.ResultOperator3.class);
+ testAppDataSources(false);
}
@Test
@@ -933,8 +910,7 @@ public class StreamingContainerManagerTest
try {
server.start();
int port = server.getPort();
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null));
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.getPath(), null));
TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
dag.addStream("o1.outport", o1.outport, o2.inport1);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java b/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java
index a1312a5..f6451c9 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
@@ -176,6 +177,14 @@ public class AutoMetricTest
}
+ private LogicalPlan dag;
+
+ @Before
+ public void setup()
+ {
+ dag = StramTestSupport.createDAG(testMeta);
+ }
+
/**
* Verify custom stats generated by operator are propagated and trigger repartition.
*
@@ -185,8 +194,7 @@ public class AutoMetricTest
@SuppressWarnings("SleepWhileInLoop")
public void testMetricPropagation() throws Exception
{
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null));
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.getPath(), null));
dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
@@ -234,9 +242,7 @@ public class AutoMetricTest
CountDownLatch latch = new CountDownLatch(1);
LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(new Configuration());
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
TestGeneratorInputOperator inputOperator = dag.addOperator("input", TestGeneratorInputOperator.class);
OperatorWithMetrics o1 = dag.addOperator("o1", OperatorWithMetrics.class);
@@ -264,8 +270,7 @@ public class AutoMetricTest
CountDownLatch latch = new CountDownLatch(2);
LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(new Configuration());
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
+
TestGeneratorInputOperator inputOperator = dag.addOperator("input", TestGeneratorInputOperator.class);
OperatorWithMetrics o1 = dag.addOperator("o1", OperatorWithMetrics.class);
@@ -289,8 +294,7 @@ public class AutoMetricTest
public void testInjectionOfDefaultMetricsAggregator() throws Exception
{
LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(new Configuration());
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
+
TestGeneratorInputOperator inputOperator = dag.addOperator("input", TestGeneratorInputOperator.class);
OperatorWithMetricMethod o1 = dag.addOperator("o1", OperatorWithMetricMethod.class);
@@ -365,9 +369,7 @@ public class AutoMetricTest
CountDownLatch latch = new CountDownLatch(1);
LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(new Configuration());
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
TestGeneratorInputOperator inputOperator = dag.addOperator("input", TestGeneratorInputOperator.class);
OperatorWithMetricMethod o1 = dag.addOperator("o1", OperatorWithMetricMethod.class);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
index cef671e..6df4e94 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
@@ -43,6 +43,7 @@ import com.datatorrent.bufferserver.packet.MessageType;
import com.datatorrent.bufferserver.util.Codec;
import com.datatorrent.stram.StramLocalCluster;
import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.support.StramTestSupport;
import com.datatorrent.stram.support.StramTestSupport.TestMeta;
import com.datatorrent.stram.tuple.EndWindowTuple;
import com.datatorrent.stram.tuple.Tuple;
@@ -52,7 +53,11 @@ import com.datatorrent.stram.tuple.Tuple;
*/
public class ProcessingModeTests
{
- @Rule public TestMeta testMeta = new TestMeta();
+ @Rule
+ public TestMeta testMeta = new TestMeta();
+
+ private LogicalPlan dag;
+
ProcessingMode processingMode;
int maxTuples = 30;
@@ -64,6 +69,7 @@ public class ProcessingModeTests
@Before
public void setup() throws IOException
{
+ dag = StramTestSupport.createDAG(testMeta);
StreamingContainer.eventloop.start();
}
@@ -79,11 +85,10 @@ public class ProcessingModeTests
CollectorOperator.collection.clear();
CollectorOperator.duplicates.clear();
- LogicalPlan dag = new LogicalPlan();
dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
- dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null));
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.getPath(), null));
RecoverableInputOperator rip = dag.addOperator("LongGenerator", RecoverableInputOperator.class);
rip.setMaximumTuples(maxTuples);
rip.setSimulateFailure(true);
@@ -105,8 +110,7 @@ public class ProcessingModeTests
CollectorOperator.collection.clear();
CollectorOperator.duplicates.clear();
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null));
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.getPath(), null));
dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
@@ -130,8 +134,7 @@ public class ProcessingModeTests
CollectorOperator.collection.clear();
CollectorOperator.duplicates.clear();
- LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null));
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.getPath(), null));
dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java b/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
index 1839c91..5dca8a4 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
@@ -31,6 +31,7 @@ import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
@@ -203,10 +204,17 @@ public class StreamPersistanceTests
{
}
+ private LogicalPlan dag;
+
+ @Before
+ public void setup()
+ {
+ dag = StramTestSupport.createDAG(testMeta);
+ }
+
@Test
public void testPersistStreamOperatorIsAdded()
{
- LogicalPlan dag = new LogicalPlan();
TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
GenericTestOperator x = dag.addOperator("x", new GenericTestOperator());
TestRecieverOperator persister = new TestRecieverOperator();
@@ -222,7 +230,6 @@ public class StreamPersistanceTests
@Test
public void testPersistStreamOperatorIsAddedPerSink()
{
- LogicalPlan dag = new LogicalPlan();
TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
GenericTestOperator x1 = dag.addOperator("x1", new GenericTestOperator());
GenericTestOperator x2 = dag.addOperator("x2", new GenericTestOperator());
@@ -255,7 +262,6 @@ public class StreamPersistanceTests
public void testaddStreamThrowsExceptionOnInvalidLoggerType()
{
// Test Logger with non-optional output ports
- LogicalPlan dag = new LogicalPlan();
TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
GenericTestOperator x = dag.addOperator("x", new GenericTestOperator());
StreamMeta stream = dag.addStream("Stream1", input1.outport, x.inport1);
@@ -302,7 +308,6 @@ public class StreamPersistanceTests
public void testaddStreamThrowsExceptionOnInvalidInputPortForLoggerType()
{
// Test for input port belonging to different object
- LogicalPlan dag = new LogicalPlan();
TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
GenericTestOperator x = dag.addOperator("x", new GenericTestOperator());
TestRecieverOperator persister = new TestRecieverOperator();
@@ -322,7 +327,6 @@ public class StreamPersistanceTests
public void testPersistStreamOperatorIsRemovedWhenStreamIsRemoved()
{
// Remove Stream and check if persist operator is removed
- LogicalPlan dag = new LogicalPlan();
TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
GenericTestOperator x = dag.addOperator("x", new GenericTestOperator());
TestRecieverOperator persister = new TestRecieverOperator();
@@ -340,7 +344,6 @@ public class StreamPersistanceTests
public void testPersistStreamOperatorIsRemovedWhenSinkIsRemoved()
{
// Remove sink and check if corresponding persist operator is removed
- LogicalPlan dag = new LogicalPlan();
TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
GenericTestOperator x1 = dag.addOperator("x1", new GenericTestOperator());
GenericTestOperator x2 = dag.addOperator("x2", new GenericTestOperator());
@@ -383,7 +386,6 @@ public class StreamPersistanceTests
@Test
public void testPersistStreamOperatorIsRemovedWhenAllSinksAreRemoved()
{
- LogicalPlan dag = new LogicalPlan();
TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
GenericTestOperator x1 = dag.addOperator("x1", new GenericTestOperator());
GenericTestOperator x2 = dag.addOperator("x2", new GenericTestOperator());
@@ -409,7 +411,6 @@ public class StreamPersistanceTests
@Test
public void testPersistStreamOperatorGeneratesIdenticalOutputAsSink() throws ClassNotFoundException, IOException, InterruptedException
{
- LogicalPlan dag = new LogicalPlan();
AscendingNumbersOperator input1 = dag.addOperator("input1", AscendingNumbersOperator.class);
// Add PersistOperator directly to dag
final TestRecieverOperator x = dag.addOperator("x", new TestRecieverOperator());
@@ -603,7 +604,6 @@ public class StreamPersistanceTests
@Test
public void testPersistStreamWithFiltering() throws ClassNotFoundException, IOException, InterruptedException
{
- LogicalPlan dag = new LogicalPlan();
AscendingNumbersOperator ascend = dag.addOperator("ascend", new AscendingNumbersOperator());
PassThruOperatorWithCodec passThru = dag.addOperator("PassThrough", new PassThruOperatorWithCodec(2));
TestRecieverOperator console = dag.addOperator("console", new TestRecieverOperator());
@@ -617,7 +617,6 @@ public class StreamPersistanceTests
@Test
public void testPersistStreamOnSingleSinkWithFiltering() throws ClassNotFoundException, IOException, InterruptedException
{
- LogicalPlan dag = new LogicalPlan();
AscendingNumbersOperator ascend = dag.addOperator("ascend", new AscendingNumbersOperator());
PassThruOperatorWithCodec passThru = dag.addOperator("PassThrough", new PassThruOperatorWithCodec(2));
final TestRecieverOperator console = dag.addOperator("console", new TestRecieverOperator());
@@ -632,7 +631,6 @@ public class StreamPersistanceTests
@Test
public void testPersistStreamOnSingleSinkWithFilteringContainerLocal() throws ClassNotFoundException, IOException, InterruptedException
{
- LogicalPlan dag = new LogicalPlan();
AscendingNumbersOperator ascend = dag.addOperator("ascend", new AscendingNumbersOperator());
PassThruOperatorWithCodec passThru = dag.addOperator("PassThrough", new PassThruOperatorWithCodec(2));
PassThruOperatorWithCodec passThru2 = dag.addOperator("Multiples_of_3", new PassThruOperatorWithCodec(3));
@@ -696,7 +694,6 @@ public class StreamPersistanceTests
@Test
public void testPersistStreamOperatorGeneratesUnionOfAllSinksOutput() throws ClassNotFoundException, IOException
{
- LogicalPlan dag = new LogicalPlan();
AscendingNumbersOperator ascend = dag.addOperator("ascend", new AscendingNumbersOperator());
PassThruOperatorWithCodec passThru1 = dag.addOperator("PassThrough1", new PassThruOperatorWithCodec(2));
PassThruOperatorWithCodec passThru2 = dag.addOperator("PassThrough2", new PassThruOperatorWithCodec(3));
@@ -829,7 +826,6 @@ public class StreamPersistanceTests
@Test
public void testPersistStreamOperatorMultiplePhysicalOperatorsForSink() throws ClassNotFoundException, IOException
{
- LogicalPlan dag = new LogicalPlan();
AscendingNumbersOperator ascend = dag.addOperator("ascend", new AscendingNumbersOperator());
PartitionedTestOperatorWithFiltering passThru = dag.addOperator("partition", new PartitionedTestOperatorWithFiltering());
final TestRecieverOperator console = dag.addOperator("console", new TestRecieverOperator());
@@ -883,7 +879,6 @@ public class StreamPersistanceTests
@Test
public void testPartitionedPersistOperator() throws ClassNotFoundException, IOException
{
- LogicalPlan dag = new LogicalPlan();
AscendingNumbersOperator ascend = dag.addOperator("ascend", new AscendingNumbersOperator());
PartitionedTestOperatorWithFiltering passThru = dag.addOperator("partition", new PartitionedTestOperatorWithFiltering());
final TestRecieverOperator console = dag.addOperator("console", new TestRecieverOperator());
@@ -942,10 +937,6 @@ public class StreamPersistanceTests
@Test
public void testDynamicPartitioning() throws ClassNotFoundException, IOException
{
- LogicalPlan dag = new LogicalPlan();
-
- dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
-
AscendingNumbersOperator ascend = dag.addOperator("ascend", new AscendingNumbersOperator());
final TestRecieverOperator console = dag.addOperator("console", new TestRecieverOperator());
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java b/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java
index 314fdfc..cf2a887 100644
--- a/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java
+++ b/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java
@@ -22,6 +22,10 @@ import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Field;
+import java.net.URI;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -50,6 +54,7 @@ import com.datatorrent.stram.api.AppDataSource;
import com.datatorrent.stram.api.BaseContext;
import com.datatorrent.stram.engine.OperatorContext;
import com.datatorrent.stram.engine.WindowGenerator;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.tuple.EndWindowTuple;
import com.datatorrent.stram.tuple.Tuple;
@@ -253,23 +258,54 @@ abstract public class StramTestSupport
public static class TestMeta extends TestWatcher
{
- public String dir = null;
+ private File dir;
@Override
protected void starting(org.junit.runner.Description description)
{
- String methodName = description.getMethodName();
- String className = description.getClassName();
- //className = className.substring(className.lastIndexOf('.') + 1);
- this.dir = "target/" + className + "/" + methodName;
- new File(this.dir).mkdirs();
+ final String methodName = description.getMethodName();
+ final String className = description.getClassName();
+ dir = new File("target/" + className + "/" + methodName);
+ try {
+ Files.createDirectories(dir.toPath());
+ } catch (FileAlreadyExistsException e) {
+ try {
+ Files.delete(dir.toPath());
+ Files.createDirectories(dir.toPath());
+ } catch (IOException ioe) {
+ throw new RuntimeException("Fail to create test working directory " + dir.getAbsolutePath(), e);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Fail to create test working directory " + dir.getAbsolutePath(), e);
+ }
}
@Override
protected void finished(org.junit.runner.Description description)
{
- FileUtils.deleteQuietly(new File(this.dir));
+ FileUtils.deleteQuietly(dir);
+ }
+
+ public String getPath()
+ {
+ return dir.getPath();
+ }
+
+ public String getAbsolutePath()
+ {
+ return dir.getAbsolutePath();
+ }
+
+ public Path toPath()
+ {
+ return dir.toPath();
+ }
+
+ public URI toURI()
+ {
+ return dir.toURI();
}
+
}
public static class TestHomeDirectory extends TestWatcher
@@ -333,6 +369,20 @@ abstract public class StramTestSupport
}
}
+ public static LogicalPlan createDAG(final TestMeta testMeta, final String suffix)
+ {
+ if (suffix == null) {
+ throw new NullPointerException();
+ }
+ LogicalPlan dag = new LogicalPlan();
+ dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.getPath() + suffix);
+ return dag;
+ }
+
+ public static LogicalPlan createDAG(final TestMeta testMeta)
+ {
+ return createDAG(testMeta, "");
+ }
public static class MemoryStorageAgent implements StorageAgent, Serializable
{