You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/02/25 19:50:52 UTC
[4/4] incubator-systemml git commit: [SYSTEMML-1309] Fix parfor spark
working dir delete on shutdown
[SYSTEMML-1309] Fix parfor spark working dir delete on shutdown
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/b78c1259
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/b78c1259
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/b78c1259
Branch: refs/heads/master
Commit: b78c125934fa7a947a5118f0c08473afa926fa5d
Parents: b028e6c
Author: Matthias Boehm <mb...@gmail.com>
Authored: Fri Feb 24 22:00:33 2017 -0800
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sat Feb 25 11:51:06 2017 -0800
----------------------------------------------------------------------
.../parfor/RemoteDPParForSparkWorker.java | 28 ++++++++-----
.../parfor/RemoteParForSparkWorker.java | 43 +++++++++++---------
.../parfor/RemoteParForUtils.java | 26 ++++++++++--
.../sysml/runtime/util/LocalFileUtils.java | 2 +-
4 files changed, 65 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b78c1259/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
index ad0fbf8..458f149 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
@@ -32,6 +32,7 @@ import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartition
import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysml.runtime.controlprogram.parfor.Task.TaskType;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler;
import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock;
import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell;
@@ -145,21 +146,28 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF
_numTasks = 0;
_numIters = 0;
- //init local cache manager
- if( !CacheableData.isCachingActive() ) {
- String uuid = IDHandler.createDistributedUniqueID();
- LocalFileUtils.createWorkingDirectoryWithUUID( uuid );
- CacheableData.initCaching( uuid ); //incl activation, cache dir creation (each map task gets its own dir for simplified cleanup)
- }
- if( !CacheableData.cacheEvictionLocalFilePrefix.contains("_") ){ //account for local mode
- CacheableData.cacheEvictionLocalFilePrefix = CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID;
+ //init and register-cleanup of buffer pool (in parfor spark, multiple tasks might
+ //share the process-local, i.e., per executor, buffer pool; hence we synchronize
+ //the initialization and immediately register the created directory for cleanup
+ //on process exit, i.e., executor exit, including any files created in the future.
+ synchronized( CacheableData.class ) {
+ if( !CacheableData.isCachingActive() && !InfrastructureAnalyzer.isLocalMode() ) {
+ //create id, executor working dir, and cache dir
+ String uuid = IDHandler.createDistributedUniqueID();
+ LocalFileUtils.createWorkingDirectoryWithUUID( uuid );
+ CacheableData.initCaching( uuid ); //incl activation and cache dir creation
+ CacheableData.cacheEvictionLocalFilePrefix =
+ CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID;
+ //register entire working dir for delete on shutdown
+ RemoteParForUtils.cleanupWorkingDirectoriesOnShutdown();
+ }
}
//ensure that resultvar files are not removed
super.pinResultVariables();
- //enable/disable caching (if required)
- if( !_caching )
+ //enable/disable caching (if required and not in CP process)
+ if( !_caching && !InfrastructureAnalyzer.isLocalMode() )
CacheableData.disableCaching();
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b78c1259/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
index e12376a..2ea802d 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
@@ -28,6 +28,7 @@ import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.util.LongAccumulator;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler;
import org.apache.sysml.runtime.util.LocalFileUtils;
@@ -35,23 +36,20 @@ import scala.Tuple2;
public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFunction<Task, Long, String>
{
-
private static final long serialVersionUID = -3254950138084272296L;
+ private final String _prog;
private boolean _initialized = false;
- private String _prog = null;
private boolean _caching = true;
- private LongAccumulator _aTasks = null;
- private LongAccumulator _aIters = null;
+ private final LongAccumulator _aTasks;
+ private final LongAccumulator _aIters;
public RemoteParForSparkWorker(String program, boolean cpCaching, LongAccumulator atasks, LongAccumulator aiters)
throws DMLRuntimeException
{
- //keep inputs (unfortunately, spark does not expose task ids and it would be implementation-dependent
- //when this constructor is actually called; hence, we do lazy initialization on task execution)
- _initialized = false;
_prog = program;
+ _initialized = false;
_caching = cpCaching;
//setup spark accumulators
@@ -65,7 +63,7 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun
{
//lazy parworker initialization
if( !_initialized )
- configureWorker( TaskContext.get().taskAttemptId() ); //requires Spark 1.3
+ configureWorker( TaskContext.get().taskAttemptId() );
//execute a single task
long numIter = getExecutedIterations();
@@ -98,24 +96,31 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun
_numTasks = 0;
_numIters = 0;
- //init local cache manager
- if( !CacheableData.isCachingActive() ) {
- String uuid = IDHandler.createDistributedUniqueID();
- LocalFileUtils.createWorkingDirectoryWithUUID( uuid );
- CacheableData.initCaching( uuid ); //incl activation, cache dir creation (each map task gets its own dir for simplified cleanup)
- }
- if( !CacheableData.cacheEvictionLocalFilePrefix.contains("_") ){ //account for local mode
- CacheableData.cacheEvictionLocalFilePrefix = CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID;
+ //init and register-cleanup of buffer pool (in parfor spark, multiple tasks might
+ //share the process-local, i.e., per executor, buffer pool; hence we synchronize
+ //the initialization and immediately register the created directory for cleanup
+ //on process exit, i.e., executor exit, including any files created in the future.
+ synchronized( CacheableData.class ) {
+ if( !CacheableData.isCachingActive() && !InfrastructureAnalyzer.isLocalMode() ) {
+ //create id, executor working dir, and cache dir
+ String uuid = IDHandler.createDistributedUniqueID();
+ LocalFileUtils.createWorkingDirectoryWithUUID( uuid );
+ CacheableData.initCaching( uuid ); //incl activation and cache dir creation
+ CacheableData.cacheEvictionLocalFilePrefix =
+ CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID;
+ //register entire working dir for delete on shutdown
+ RemoteParForUtils.cleanupWorkingDirectoriesOnShutdown();
+ }
}
//ensure that resultvar files are not removed
super.pinResultVariables();
- //enable/disable caching (if required)
- if( !_caching )
+ //enable/disable caching (if required and not in CP process)
+ if( !_caching && !InfrastructureAnalyzer.isLocalMode() )
CacheableData.disableCaching();
- //make as lazily intialized
+ //mark as initialized
_initialized = true;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b78c1259/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
index 7b3ecb1..fd99429 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
@@ -190,12 +190,9 @@ public class RemoteParForUtils
return ret;
}
-
/**
- * Cleanup all temporary files created by this SystemML process
- * instance.
- *
+ * Cleanup all temporary files created by this SystemML process.
*/
public static void cleanupWorkingDirectories()
{
@@ -216,6 +213,15 @@ public class RemoteParForUtils
}
}
+ /**
+ * Cleanup all temporary files created by this SystemML process,
+ * on shutdown via exit or interrupt.
+ */
+ public static void cleanupWorkingDirectoriesOnShutdown() {
+ Runtime.getRuntime().addShutdownHook(
+ new DeleteWorkingDirectoriesTask());
+ }
+
public static LocalVariableMap[] getResults( List<Tuple2<Long,String>> out, Log LOG )
throws DMLRuntimeException
{
@@ -241,4 +247,16 @@ public class RemoteParForUtils
//create return array
return tmp.values().toArray(new LocalVariableMap[0]);
}
+
+ /**
+ * Task to be registered as shutdown hook in order to delete the
+ * all working directories, including any remaining files, which
+ * might not have been created at time of registration.
+ */
+ private static class DeleteWorkingDirectoriesTask extends Thread {
+ @Override
+ public void run() {
+ cleanupWorkingDirectories();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b78c1259/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java b/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java
index c868f60..0086f8f 100644
--- a/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java
@@ -305,7 +305,7 @@ public class LocalFileUtils
return createWorkingDirectoryWithUUID( DMLScript.getUUID() );
}
- public static synchronized String createWorkingDirectoryWithUUID( String uuid )
+ public static String createWorkingDirectoryWithUUID( String uuid )
throws DMLRuntimeException
{
//create local tmp dir if not existing