You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/02/25 19:50:52 UTC

[4/4] incubator-systemml git commit: [SYSTEMML-1309] Fix parfor spark working dir delete on shutdown

[SYSTEMML-1309] Fix parfor spark working dir delete on shutdown

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/b78c1259
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/b78c1259
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/b78c1259

Branch: refs/heads/master
Commit: b78c125934fa7a947a5118f0c08473afa926fa5d
Parents: b028e6c
Author: Matthias Boehm <mb...@gmail.com>
Authored: Fri Feb 24 22:00:33 2017 -0800
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sat Feb 25 11:51:06 2017 -0800

----------------------------------------------------------------------
 .../parfor/RemoteDPParForSparkWorker.java       | 28 ++++++++-----
 .../parfor/RemoteParForSparkWorker.java         | 43 +++++++++++---------
 .../parfor/RemoteParForUtils.java               | 26 ++++++++++--
 .../sysml/runtime/util/LocalFileUtils.java      |  2 +-
 4 files changed, 65 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b78c1259/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
index ad0fbf8..458f149 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
@@ -32,6 +32,7 @@ import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartition
 import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.parfor.Task.TaskType;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler;
 import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock;
 import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell;
@@ -145,21 +146,28 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF
 		_numTasks    = 0;
 		_numIters    = 0;
 
-		//init local cache manager 
-		if( !CacheableData.isCachingActive() ) {
-			String uuid = IDHandler.createDistributedUniqueID();
-			LocalFileUtils.createWorkingDirectoryWithUUID( uuid );
-			CacheableData.initCaching( uuid ); //incl activation, cache dir creation (each map task gets its own dir for simplified cleanup)
-		}		
-		if( !CacheableData.cacheEvictionLocalFilePrefix.contains("_") ){ //account for local mode
-			CacheableData.cacheEvictionLocalFilePrefix = CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID; 
+		//init and register-cleanup of buffer pool (in parfor spark, multiple tasks might 
+		//share the process-local, i.e., per executor, buffer pool; hence we synchronize 
+		//the initialization and immediately register the created directory for cleanup
+		//on process exit, i.e., executor exit, including any files created in the future.
+		synchronized( CacheableData.class ) {
+			if( !CacheableData.isCachingActive() && !InfrastructureAnalyzer.isLocalMode() ) { 
+				//create id, executor working dir, and cache dir
+				String uuid = IDHandler.createDistributedUniqueID();
+				LocalFileUtils.createWorkingDirectoryWithUUID( uuid );
+				CacheableData.initCaching( uuid ); //incl activation and cache dir creation
+				CacheableData.cacheEvictionLocalFilePrefix = 
+						CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID; 
+				//register entire working dir for delete on shutdown
+				RemoteParForUtils.cleanupWorkingDirectoriesOnShutdown();
+			}	
 		}
 		
 		//ensure that resultvar files are not removed
 		super.pinResultVariables();
 		
-		//enable/disable caching (if required)
-		if( !_caching )
+		//enable/disable caching (if required and not in CP process)
+		if( !_caching && !InfrastructureAnalyzer.isLocalMode() )
 			CacheableData.disableCaching();
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b78c1259/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
index e12376a..2ea802d 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
@@ -28,6 +28,7 @@ import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.util.LongAccumulator;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler;
 import org.apache.sysml.runtime.util.LocalFileUtils;
 
@@ -35,23 +36,20 @@ import scala.Tuple2;
 
 public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFunction<Task, Long, String> 
 {
-	
 	private static final long serialVersionUID = -3254950138084272296L;
 
+	private final String  _prog;
 	private boolean _initialized = false;
-	private String  _prog = null;
 	private boolean _caching = true;
 	
-	private LongAccumulator _aTasks = null;
-	private LongAccumulator _aIters = null;
+	private final LongAccumulator _aTasks;
+	private final LongAccumulator _aIters;
 	
 	public RemoteParForSparkWorker(String program, boolean cpCaching, LongAccumulator atasks, LongAccumulator aiters) 
 		throws DMLRuntimeException
 	{
-		//keep inputs (unfortunately, spark does not expose task ids and it would be implementation-dependent
-		//when this constructor is actually called; hence, we do lazy initialization on task execution)
-		_initialized = false;
 		_prog = program;
+		_initialized = false;
 		_caching = cpCaching;
 		
 		//setup spark accumulators
@@ -65,7 +63,7 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun
 	{
 		//lazy parworker initialization
 		if( !_initialized )
-			configureWorker( TaskContext.get().taskAttemptId() ); //requires Spark 1.3
+			configureWorker( TaskContext.get().taskAttemptId() );
 		
 		//execute a single task
 		long numIter = getExecutedIterations();
@@ -98,24 +96,31 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun
 		_numTasks    = 0;
 		_numIters    = 0;
 
-		//init local cache manager 
-		if( !CacheableData.isCachingActive() ) {
-			String uuid = IDHandler.createDistributedUniqueID();
-			LocalFileUtils.createWorkingDirectoryWithUUID( uuid );
-			CacheableData.initCaching( uuid ); //incl activation, cache dir creation (each map task gets its own dir for simplified cleanup)
-		}		
-		if( !CacheableData.cacheEvictionLocalFilePrefix.contains("_") ){ //account for local mode
-			CacheableData.cacheEvictionLocalFilePrefix = CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID; 
+		//init and register-cleanup of buffer pool (in parfor spark, multiple tasks might 
+		//share the process-local, i.e., per executor, buffer pool; hence we synchronize 
+		//the initialization and immediately register the created directory for cleanup
+		//on process exit, i.e., executor exit, including any files created in the future.
+		synchronized( CacheableData.class ) {
+			if( !CacheableData.isCachingActive() && !InfrastructureAnalyzer.isLocalMode() ) { 
+				//create id, executor working dir, and cache dir
+				String uuid = IDHandler.createDistributedUniqueID();
+				LocalFileUtils.createWorkingDirectoryWithUUID( uuid );
+				CacheableData.initCaching( uuid ); //incl activation and cache dir creation
+				CacheableData.cacheEvictionLocalFilePrefix = 
+						CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID; 
+				//register entire working dir for delete on shutdown
+				RemoteParForUtils.cleanupWorkingDirectoriesOnShutdown();
+			}	
 		}
 		
 		//ensure that resultvar files are not removed
 		super.pinResultVariables();
 		
-		//enable/disable caching (if required)
-		if( !_caching )
+		//enable/disable caching (if required and not in CP process)
+		if( !_caching && !InfrastructureAnalyzer.isLocalMode() )
 			CacheableData.disableCaching();
 		
-		//make as lazily intialized
+		//mark as initialized
 		_initialized = true;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b78c1259/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
index 7b3ecb1..fd99429 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
@@ -190,12 +190,9 @@ public class RemoteParForUtils
 		
 		return ret;
 	}
-		
 	
 	/**
-	 * Cleanup all temporary files created by this SystemML process
-	 * instance.
-	 * 
+	 * Cleanup all temporary files created by this SystemML process.
 	 */
 	public static void cleanupWorkingDirectories()
 	{
@@ -216,6 +213,15 @@ public class RemoteParForUtils
 		}
 	}
 
+	/**
+	 * Cleanup all temporary files created by this SystemML process,
+	 * on shutdown via exit or interrupt.
+	 */
+	public static void cleanupWorkingDirectoriesOnShutdown() {
+		Runtime.getRuntime().addShutdownHook(
+				new DeleteWorkingDirectoriesTask());
+	}
+	
 	public static LocalVariableMap[] getResults( List<Tuple2<Long,String>> out, Log LOG ) 
 		throws DMLRuntimeException
 	{
@@ -241,4 +247,16 @@ public class RemoteParForUtils
 		//create return array
 		return tmp.values().toArray(new LocalVariableMap[0]);	
 	}
+	
+	/**
+	 * Task to be registered as shutdown hook in order to delete the 
+	 * all working directories, including any remaining files, which 
+	 * might not have been created  at time of registration.
+	 */
+	private static class DeleteWorkingDirectoriesTask extends Thread {
+		@Override
+		public void run() {
+			cleanupWorkingDirectories();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b78c1259/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java b/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java
index c868f60..0086f8f 100644
--- a/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java
@@ -305,7 +305,7 @@ public class LocalFileUtils
 		return createWorkingDirectoryWithUUID( DMLScript.getUUID() );
 	}
 
-	public static synchronized String createWorkingDirectoryWithUUID( String uuid )
+	public static String createWorkingDirectoryWithUUID( String uuid )
 		throws DMLRuntimeException 
 	{
 		//create local tmp dir if not existing