You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2017/01/10 02:55:49 UTC

[1/2] apex-core git commit: APEXCORE-598 Write checkpoints to APPLICATION_PATH in embedded execution mode.

Repository: apex-core
Updated Branches:
  refs/heads/master 51de67e61 -> d9bc67d5a


APEXCORE-598 Write checkpoints to APPLICATION_PATH in embedded execution mode.


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/f6f6d5f5
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/f6f6d5f5
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/f6f6d5f5

Branch: refs/heads/master
Commit: f6f6d5f541bb88301a4b401392e848ee0d2bc3c9
Parents: 05c798d
Author: Thomas Weise <th...@apache.org>
Authored: Tue Jan 3 09:59:52 2017 -0800
Committer: Thomas Weise <th...@apache.org>
Committed: Mon Jan 9 10:23:09 2017 -0800

----------------------------------------------------------------------
 .../datatorrent/stram/StramLocalCluster.java    | 48 +++++++++++++-------
 .../stram/StramLocalClusterTest.java            | 17 +++++++
 2 files changed, 48 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/f6f6d5f5/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
index 14a2827..e188b60 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
@@ -58,7 +58,7 @@ import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
 import com.datatorrent.stram.plan.physical.PTOperator;
 
 /**
- * Launcher for topologies in local mode within a single process.
+ * Launcher for topologies in embedded mode within a single process.
  * Child containers are mapped to threads.
  *
  * @since 0.3.2
@@ -67,7 +67,7 @@ public class StramLocalCluster implements Runnable, Controller
 {
   private static final Logger LOG = LoggerFactory.getLogger(StramLocalCluster.class);
   // assumes execution as unit test
-  private static File CLUSTER_WORK_DIR = new File("target", StramLocalCluster.class.getName());
+  private static final File DEFAULT_APP_DIR = new File("target", StramLocalCluster.class.getName());
   private static final String LOCALHOST_PROPERTY_KEY = "org.apache.apex.stram.StramLocalCluster.hostname";
   private static final String LOCALHOST = System.getProperty(LOCALHOST_PROPERTY_KEY, "localhost");
   protected final StreamingContainerManager dnmgr;
@@ -283,11 +283,24 @@ public class StramLocalCluster implements Runnable, Controller
     dag.validate();
     // ensure plan can be serialized
     cloneLogicalPlan(dag);
-    // convert to URI so we always write to local file system,
-    // even when the environment has a default HDFS location.
-    String pathUri = CLUSTER_WORK_DIR.toURI().toString();
+    final Path pathUri;
+    String appPath = dag.getAttributes().get(LogicalPlan.APPLICATION_PATH);
+    if (appPath == null) {
+      // convert to URI so we always write to local file system,
+      // even when the environment has a default HDFS location.
+      pathUri = new Path(DEFAULT_APP_DIR.toURI());
+      dag.getAttributes().put(LogicalPlan.APPLICATION_PATH, pathUri.toString());
+    } else {
+      // should accept any valid path URI (or relative path) provided by user
+      Path tmp = new Path(appPath);
+      if (!tmp.isAbsolute()) {
+        pathUri = new Path(new File(appPath).toURI());
+      } else {
+        pathUri = tmp;
+      }
+    }
     try {
-      FileContext.getLocalFSFileContext().delete(new Path(pathUri/*CLUSTER_WORK_DIR.getAbsolutePath()*/), true);
+      FileContext.getLocalFSFileContext().delete(pathUri, true);
     } catch (IllegalArgumentException e) {
       throw e;
     } catch (IOException e) {
@@ -295,22 +308,11 @@ public class StramLocalCluster implements Runnable, Controller
     }
 
     dag.getAttributes().put(LogicalPlan.APPLICATION_ID, "app_local_" + System.currentTimeMillis());
-    if (dag.getAttributes().get(LogicalPlan.APPLICATION_PATH) == null) {
-      dag.getAttributes().put(LogicalPlan.APPLICATION_PATH, pathUri);
-    }
     if (dag.getAttributes().get(OperatorContext.STORAGE_AGENT) == null) {
       dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(new Path(pathUri, LogicalPlan.SUBDIR_CHECKPOINTS).toString(), null));
     }
     this.dnmgr = new StreamingContainerManager(dag);
     this.umbilical = new UmbilicalProtocolLocalImpl();
-
-    if (!perContainerBufferServer) {
-      StreamingContainer.eventloop.start();
-      bufferServer = new Server(0, 1024 * 1024,8);
-      bufferServer.setSpoolStorage(new DiskStorage());
-      bufferServerAddress = InetSocketAddress.createUnresolved(LOCALHOST, bufferServer.run(StreamingContainer.eventloop).getPort());
-      LOG.info("Buffer server started: {}", bufferServerAddress);
-    }
   }
 
   public static LogicalPlan cloneLogicalPlan(LogicalPlan lp) throws IOException, ClassNotFoundException
@@ -442,6 +444,18 @@ public class StramLocalCluster implements Runnable, Controller
   @SuppressWarnings({"SleepWhileInLoop", "ResultOfObjectAllocationIgnored"})
   public void run(long runMillis)
   {
+    if (!perContainerBufferServer) {
+      StreamingContainer.eventloop.start();
+      bufferServer = new Server(0, 1024 * 1024,8);
+      try {
+        bufferServer.setSpoolStorage(new DiskStorage());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      bufferServerAddress = InetSocketAddress.createUnresolved(LOCALHOST, bufferServer.run(StreamingContainer.eventloop).getPort());
+      LOG.info("Buffer server started: {}", bufferServerAddress);
+    }
+
     long endMillis = System.currentTimeMillis() + runMillis;
     List<Thread> containerThreads = new LinkedList<>();
 

http://git-wip-us.apache.org/repos/asf/apex-core/blob/f6f6d5f5/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java b/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
index 1a5046c..5bea0b3 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
@@ -40,6 +40,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 
 import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.DAGContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.LocalMode;
@@ -395,5 +396,21 @@ public class StramLocalClusterTest
     return new File(destDir, pojoClassName + ".jar").getAbsolutePath();
   }
 
+  @Test
+  public void testAppPath() throws Exception
+  {
+    // add operator for initial checkpoint
+    TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
+    o1.setMaxTuples(1);
+    File relPath = new File(dag.getAttributes().get(DAGContext.APPLICATION_PATH));
+    String uriPath = relPath.toURI().toString();
+    dag.setAttribute(DAGContext.APPLICATION_PATH, uriPath);
+    StramLocalCluster cluster = new StramLocalCluster(dag);
+    // no need for run(), just need the initial checkpoint
+    Assert.assertFalse(cluster.isFinished());
+    Assert.assertTrue("app path exists", relPath.exists() && relPath.isDirectory());
+    File checkPointDir = new File(relPath, LogicalPlan.SUBDIR_CHECKPOINTS);
+    Assert.assertTrue("checkpoint path exists", checkPointDir.exists() && checkPointDir.isDirectory());
+  }
 
 }


[2/2] apex-core git commit: Merge branch 'APEXCORE-598' of https://github.com/tweise/apex-core

Posted by vr...@apache.org.
Merge branch 'APEXCORE-598' of https://github.com/tweise/apex-core


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/d9bc67d5
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/d9bc67d5
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/d9bc67d5

Branch: refs/heads/master
Commit: d9bc67d5a62ca00d7ff1aea9a308fc8c6cf4be52
Parents: 51de67e f6f6d5f
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Mon Jan 9 18:52:55 2017 -0800
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Mon Jan 9 18:52:55 2017 -0800

----------------------------------------------------------------------
 .../datatorrent/stram/StramLocalCluster.java    | 48 +++++++++++++-------
 .../stram/StramLocalClusterTest.java            | 17 +++++++
 2 files changed, 48 insertions(+), 17 deletions(-)
----------------------------------------------------------------------