You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2017/01/10 02:55:49 UTC
[1/2] apex-core git commit: APEXCORE-598 Write checkpoints to
APPLICATION_PATH in embedded execution mode.
Repository: apex-core
Updated Branches:
refs/heads/master 51de67e61 -> d9bc67d5a
APEXCORE-598 Write checkpoints to APPLICATION_PATH in embedded execution mode.
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/f6f6d5f5
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/f6f6d5f5
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/f6f6d5f5
Branch: refs/heads/master
Commit: f6f6d5f541bb88301a4b401392e848ee0d2bc3c9
Parents: 05c798d
Author: Thomas Weise <th...@apache.org>
Authored: Tue Jan 3 09:59:52 2017 -0800
Committer: Thomas Weise <th...@apache.org>
Committed: Mon Jan 9 10:23:09 2017 -0800
----------------------------------------------------------------------
.../datatorrent/stram/StramLocalCluster.java | 48 +++++++++++++-------
.../stram/StramLocalClusterTest.java | 17 +++++++
2 files changed, 48 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/f6f6d5f5/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
index 14a2827..e188b60 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
@@ -58,7 +58,7 @@ import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
import com.datatorrent.stram.plan.physical.PTOperator;
/**
- * Launcher for topologies in local mode within a single process.
+ * Launcher for topologies in embedded mode within a single process.
* Child containers are mapped to threads.
*
* @since 0.3.2
@@ -67,7 +67,7 @@ public class StramLocalCluster implements Runnable, Controller
{
private static final Logger LOG = LoggerFactory.getLogger(StramLocalCluster.class);
// assumes execution as unit test
- private static File CLUSTER_WORK_DIR = new File("target", StramLocalCluster.class.getName());
+ private static final File DEFAULT_APP_DIR = new File("target", StramLocalCluster.class.getName());
private static final String LOCALHOST_PROPERTY_KEY = "org.apache.apex.stram.StramLocalCluster.hostname";
private static final String LOCALHOST = System.getProperty(LOCALHOST_PROPERTY_KEY, "localhost");
protected final StreamingContainerManager dnmgr;
@@ -283,11 +283,24 @@ public class StramLocalCluster implements Runnable, Controller
dag.validate();
// ensure plan can be serialized
cloneLogicalPlan(dag);
- // convert to URI so we always write to local file system,
- // even when the environment has a default HDFS location.
- String pathUri = CLUSTER_WORK_DIR.toURI().toString();
+ final Path pathUri;
+ String appPath = dag.getAttributes().get(LogicalPlan.APPLICATION_PATH);
+ if (appPath == null) {
+ // convert to URI so we always write to local file system,
+ // even when the environment has a default HDFS location.
+ pathUri = new Path(DEFAULT_APP_DIR.toURI());
+ dag.getAttributes().put(LogicalPlan.APPLICATION_PATH, pathUri.toString());
+ } else {
+ // should accept any valid path URI (or relative path) provided by user
+ Path tmp = new Path(appPath);
+ if (!tmp.isAbsolute()) {
+ pathUri = new Path(new File(appPath).toURI());
+ } else {
+ pathUri = tmp;
+ }
+ }
try {
- FileContext.getLocalFSFileContext().delete(new Path(pathUri/*CLUSTER_WORK_DIR.getAbsolutePath()*/), true);
+ FileContext.getLocalFSFileContext().delete(pathUri, true);
} catch (IllegalArgumentException e) {
throw e;
} catch (IOException e) {
@@ -295,22 +308,11 @@ public class StramLocalCluster implements Runnable, Controller
}
dag.getAttributes().put(LogicalPlan.APPLICATION_ID, "app_local_" + System.currentTimeMillis());
- if (dag.getAttributes().get(LogicalPlan.APPLICATION_PATH) == null) {
- dag.getAttributes().put(LogicalPlan.APPLICATION_PATH, pathUri);
- }
if (dag.getAttributes().get(OperatorContext.STORAGE_AGENT) == null) {
dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(new Path(pathUri, LogicalPlan.SUBDIR_CHECKPOINTS).toString(), null));
}
this.dnmgr = new StreamingContainerManager(dag);
this.umbilical = new UmbilicalProtocolLocalImpl();
-
- if (!perContainerBufferServer) {
- StreamingContainer.eventloop.start();
- bufferServer = new Server(0, 1024 * 1024,8);
- bufferServer.setSpoolStorage(new DiskStorage());
- bufferServerAddress = InetSocketAddress.createUnresolved(LOCALHOST, bufferServer.run(StreamingContainer.eventloop).getPort());
- LOG.info("Buffer server started: {}", bufferServerAddress);
- }
}
public static LogicalPlan cloneLogicalPlan(LogicalPlan lp) throws IOException, ClassNotFoundException
@@ -442,6 +444,18 @@ public class StramLocalCluster implements Runnable, Controller
@SuppressWarnings({"SleepWhileInLoop", "ResultOfObjectAllocationIgnored"})
public void run(long runMillis)
{
+ if (!perContainerBufferServer) {
+ StreamingContainer.eventloop.start();
+ bufferServer = new Server(0, 1024 * 1024,8);
+ try {
+ bufferServer.setSpoolStorage(new DiskStorage());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ bufferServerAddress = InetSocketAddress.createUnresolved(LOCALHOST, bufferServer.run(StreamingContainer.eventloop).getPort());
+ LOG.info("Buffer server started: {}", bufferServerAddress);
+ }
+
long endMillis = System.currentTimeMillis() + runMillis;
List<Thread> containerThreads = new LinkedList<>();
http://git-wip-us.apache.org/repos/asf/apex-core/blob/f6f6d5f5/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java b/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
index 1a5046c..5bea0b3 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
@@ -40,6 +40,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.DAGContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.LocalMode;
@@ -395,5 +396,21 @@ public class StramLocalClusterTest
return new File(destDir, pojoClassName + ".jar").getAbsolutePath();
}
+ @Test
+ public void testAppPath() throws Exception
+ {
+ // add operator for initial checkpoint
+ TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
+ o1.setMaxTuples(1);
+ File relPath = new File(dag.getAttributes().get(DAGContext.APPLICATION_PATH));
+ String uriPath = relPath.toURI().toString();
+ dag.setAttribute(DAGContext.APPLICATION_PATH, uriPath);
+ StramLocalCluster cluster = new StramLocalCluster(dag);
+ // no need for run(), just need the initial checkpoint
+ Assert.assertFalse(cluster.isFinished());
+ Assert.assertTrue("app path exists", relPath.exists() && relPath.isDirectory());
+ File checkPointDir = new File(relPath, LogicalPlan.SUBDIR_CHECKPOINTS);
+ Assert.assertTrue("checkpoint path exists", checkPointDir.exists() && checkPointDir.isDirectory());
+ }
}
[2/2] apex-core git commit: Merge branch 'APEXCORE-598' of
https://github.com/tweise/apex-core
Posted by vr...@apache.org.
Merge branch 'APEXCORE-598' of https://github.com/tweise/apex-core
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/d9bc67d5
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/d9bc67d5
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/d9bc67d5
Branch: refs/heads/master
Commit: d9bc67d5a62ca00d7ff1aea9a308fc8c6cf4be52
Parents: 51de67e f6f6d5f
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Mon Jan 9 18:52:55 2017 -0800
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Mon Jan 9 18:52:55 2017 -0800
----------------------------------------------------------------------
.../datatorrent/stram/StramLocalCluster.java | 48 +++++++++++++-------
.../stram/StramLocalClusterTest.java | 17 +++++++
2 files changed, 48 insertions(+), 17 deletions(-)
----------------------------------------------------------------------