You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/10 00:12:48 UTC

[17/50] incubator-apex-core git commit: fixed tests

fixed tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/1617ca39
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/1617ca39
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/1617ca39

Branch: refs/heads/master
Commit: 1617ca393c1e066349282dfb6d9778aea8c67177
Parents: c5d819b
Author: Gaurav <ga...@datatorrent.com>
Authored: Thu Aug 6 17:31:21 2015 -0700
Committer: Gaurav <ga...@datatorrent.com>
Committed: Thu Aug 6 21:12:51 2015 -0700

----------------------------------------------------------------------
 .../stram/StreamingContainerManagerTest.java        |  7 +++----
 .../datatorrent/stram/engine/AtLeastOnceTest.java   | 16 ++++++++++++++++
 .../stram/engine/StreamingContainerTest.java        |  6 ++++++
 .../datatorrent/stram/stream/OiOEndWindowTest.java  |  5 +++++
 4 files changed, 30 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1617ca39/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
index 38a54f0..a238e3e 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
@@ -740,8 +740,7 @@ public class StreamingContainerManagerTest {
   @Test
   public void testPhysicalPropertyUpdate() throws Exception{
     LogicalPlan dag = new LogicalPlan();
-    String workingDir = new File("target/testPhysicalPropertyUpdate").getAbsolutePath();
-    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
+    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
     TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
     GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
     dag.addStream("o1.outport", o1.outport, o2.inport1);
@@ -784,8 +783,7 @@ public class StreamingContainerManagerTest {
 
   private void testAppDataSources(LogicalPlan dag, boolean appendQIDToTopic) throws Exception
   {
-    String workingDir = new File("target/testAppDataSources").getAbsolutePath();
-    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
+    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
     StramLocalCluster lc = new StramLocalCluster(dag);
     lc.runAsync();
     StreamingContainerManager dnmgr = lc.dnmgr;
@@ -859,6 +857,7 @@ public class StreamingContainerManagerTest {
     try {
       server.start();
       LogicalPlan dag = new LogicalPlan();
+      dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
       TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
       GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
       dag.addStream("o1.outport", o1.outport, o2.inport1);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1617ca39/engine/src/test/java/com/datatorrent/stram/engine/AtLeastOnceTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/AtLeastOnceTest.java b/engine/src/test/java/com/datatorrent/stram/engine/AtLeastOnceTest.java
index 01cc675..f32be13 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/AtLeastOnceTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/AtLeastOnceTest.java
@@ -15,6 +15,7 @@
  */
 package com.datatorrent.stram.engine;
 
+import java.io.File;
 import java.io.IOException;
 
 import org.junit.After;
@@ -24,7 +25,10 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG.Locality;
+
+import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.stram.StramLocalCluster;
 import com.datatorrent.stram.engine.ProcessingModeTests.CollectorOperator;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
@@ -56,6 +60,10 @@ public class AtLeastOnceTest
     CollectorOperator.collection.clear();
     int maxTuples = 30;
     LogicalPlan dag = new LogicalPlan();
+    String workingDir = new File("target/testInputOperatorRecovery").getAbsolutePath();
+    AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null);
+    asyncFSStorageAgent.setSyncCheckpoint(true);
+    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, asyncFSStorageAgent);
     dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
     dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
     dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
@@ -79,6 +87,10 @@ public class AtLeastOnceTest
     CollectorOperator.collection.clear();
     int maxTuples = 30;
     LogicalPlan dag = new LogicalPlan();
+    String workingDir = new File("target/testOperatorRecovery").getAbsolutePath();
+    AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null);
+    asyncFSStorageAgent.setSyncCheckpoint(true);
+    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, asyncFSStorageAgent);
     dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
     dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
     dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
@@ -103,6 +115,10 @@ public class AtLeastOnceTest
     CollectorOperator.collection.clear();
     int maxTuples = 30;
     LogicalPlan dag = new LogicalPlan();
+    String workingDir = new File("target/testOperatorRecovery").getAbsolutePath();
+    AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null);
+    asyncFSStorageAgent.setSyncCheckpoint(true);
+    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, asyncFSStorageAgent);
     //dag.getAttributes().get(DAG.HEARTBEAT_INTERVAL_MILLIS, 400);
     dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
     dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1617ca39/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java b/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java
index 911f69a..7d37429 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java
@@ -15,6 +15,7 @@
  */
 package com.datatorrent.stram.engine;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -24,7 +25,10 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.common.util.BaseOperator;
+
+import com.datatorrent.api.Context;
 import com.datatorrent.api.Context.DAGContext;
 import com.datatorrent.api.InputOperator;
 import com.datatorrent.api.Operator.CheckpointListener;
@@ -42,6 +46,8 @@ public class StreamingContainerTest
   public void testCommitted() throws IOException, ClassNotFoundException
   {
     LogicalPlan lp = new LogicalPlan();
+    String workingDir = new File("target/testCommitted").getAbsolutePath();
+    lp.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
     lp.setAttribute(DAGContext.CHECKPOINT_WINDOW_COUNT, 1);
     CommitAwareOperator operator = lp.addOperator("CommitAwareOperator", new CommitAwareOperator());
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1617ca39/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java b/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java
index 38f7a0b..a4e9c43 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java
@@ -15,6 +15,9 @@
  */
 package com.datatorrent.stram.stream;
 
+import java.io.File;
+
+import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.*;
 import org.junit.Assert;
@@ -93,6 +96,8 @@ public class OiOEndWindowTest
   public void validateOiOImplementation() throws Exception
   {
     LogicalPlan lp = new LogicalPlan();
+    String workingDir = new File("target/validateOiOImplementation").getAbsolutePath();
+    lp.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null));
     TestInputOperator io = lp.addOperator("Input Operator", new TestInputOperator());
     FirstGenericOperator go = lp.addOperator("First Generic Operator", new FirstGenericOperator());
     SecondGenericOperator out = lp.addOperator("Second Generic Operator", new SecondGenericOperator());