You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2018/05/07 02:12:43 UTC

systemml git commit: [SYSTEMML-2305] Fix bufferpool memory leak repeated spark remote parfor

Repository: systemml
Updated Branches:
  refs/heads/master 3d61fddd9 -> f393ebe31


[SYSTEMML-2305] Fix bufferpool memory leak repeated spark remote parfor

This patch fixes increasing slowdowns on repeated spark remote parfor
loops (where the entire parfor loop is executed as a spark job) due to
missing cleanup from the buffer pool. Specifically, result variables
(which are pinned until write) and specific temporary intermediates
(from inner loops) where not properly removed from the in-memory buffer
pool leading to unnecessary evictions on subsequent jobs.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/f393ebe3
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/f393ebe3
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/f393ebe3

Branch: refs/heads/master
Commit: f393ebe31fe9a7c03ab7cef08a45cbb1a8e5533e
Parents: 3d61fdd
Author: Matthias Boehm <mb...@gmail.com>
Authored: Sun May 6 19:13:51 2018 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sun May 6 19:13:51 2018 -0700

----------------------------------------------------------------------
 .../org/apache/sysml/hops/OptimizerUtils.java   |  6 +++++
 .../controlprogram/ParForProgramBlock.java      |  3 ++-
 .../controlprogram/caching/CacheableData.java   |  6 ++---
 .../parfor/CachedReuseVariables.java            |  4 +++-
 .../parfor/RemoteParForSpark.java               |  7 +++---
 .../parfor/RemoteParForSparkWorker.java         | 24 +++++++++++++++-----
 .../parfor/RemoteParForUtils.java               |  2 ++
 7 files changed, 38 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/f393ebe3/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
index 3fa4b72..3ee6bc6 100644
--- a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
@@ -445,6 +445,12 @@ public class OptimizerUtils
 		return InfrastructureAnalyzer.getLocalParallelism() == k;
 	}
 
+	public static boolean isTopLevelParFor() {
+		//since every local parfor with degree of parallelism k>1 changes the
+		//local memory budget, we can simply probe the current memory fraction
+		return InfrastructureAnalyzer.getLocalMaxMemoryFraction() >= 0.99;
+	}
+	
 	public static boolean checkSparkBroadcastMemoryBudget( double size )
 	{
 		double memBudgetExec = SparkExecutionContext.getBroadcastMemoryBudget();

http://git-wip-us.apache.org/repos/asf/systemml/blob/f393ebe3/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
index ecbbb6f..732e59d 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
@@ -1043,8 +1043,9 @@ public class ParForProgramBlock extends ForProgramBlock
 		exportMatricesToHDFS(ec);
 		
 		// Step 3) submit Spark parfor job (no lazy evaluation, since collect on result)
+		boolean topLevelPF = OptimizerUtils.isTopLevelParFor();
 		RemoteParForJobReturn ret = RemoteParForSpark.runJob(_ID, program,
-			clsMap, tasks, ec, _resultVars, _enableCPCaching, _numThreads);
+			clsMap, tasks, ec, _resultVars, _enableCPCaching, _numThreads, topLevelPF);
 		
 		if( _monitor ) 
 			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_EXEC_T, time.stop());

http://git-wip-us.apache.org/repos/asf/systemml/blob/f393ebe3/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
index 3682fe1..73f6c08 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
@@ -836,17 +836,17 @@ public abstract class CacheableData<T extends CacheBlock> extends Data
 	 * evicted data blob, without reading it.
 	 * Must be defined by a subclass, never called by users.
 	 */
-	protected void freeEvictedBlob() {
+	public final void freeEvictedBlob() {
 		String cacheFilePathAndName = getCacheFilePathAndName();
 		long begin = LOG.isTraceEnabled() ? System.currentTimeMillis() : 0;
 		if( LOG.isTraceEnabled() )
 			LOG.trace("CACHE: Freeing evicted matrix...  " + hashCode() + "  HDFS path: " + 
-						(_hdfsFileName == null ? "null" : _hdfsFileName) + " Eviction path: " + cacheFilePathAndName);
+				(_hdfsFileName == null ? "null" : _hdfsFileName) + " Eviction path: " + cacheFilePathAndName);
 		
 		LazyWriteBuffer.deleteBlock(cacheFilePathAndName);
 		
 		if( LOG.isTraceEnabled() )
-			LOG.trace("Freeing evicted matrix - COMPLETED ... " + (System.currentTimeMillis()-begin) + " msec.");		
+			LOG.trace("Freeing evicted matrix - COMPLETED ... " + (System.currentTimeMillis()-begin) + " msec.");
 	}
 
 	protected boolean isBelowCachingThreshold() {

http://git-wip-us.apache.org/repos/asf/systemml/blob/f393ebe3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/CachedReuseVariables.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/CachedReuseVariables.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/CachedReuseVariables.java
index 3db8e7a..352da3f 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/CachedReuseVariables.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/CachedReuseVariables.java
@@ -46,7 +46,7 @@ public class CachedReuseVariables
 	}
 	
 	@SuppressWarnings("unused")
-	public synchronized void reuseVariables(long pfid, LocalVariableMap vars, Collection<String> blacklist, Map<String, Broadcast<CacheBlock>> _brInputs) {
+	public synchronized void reuseVariables(long pfid, LocalVariableMap vars, Collection<String> blacklist, Map<String, Broadcast<CacheBlock>> _brInputs, boolean cleanCache) {
 
 		//fetch the broadcast variables
 		if (ParForProgramBlock.ALLOW_BROADCAST_INPUTS && !containsVars(pfid)) {
@@ -60,6 +60,8 @@ public class CachedReuseVariables
 		
 		//build reuse map if not created yet or evicted
 		if( tmp == null ) {
+			if( cleanCache )
+				_data.clear();
 			tmp = new LocalVariableMap(vars);
 			tmp.removeAllIn((blacklist instanceof HashSet) ?
 				(HashSet<String>)blacklist : new HashSet<>(blacklist));

http://git-wip-us.apache.org/repos/asf/systemml/blob/f393ebe3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java
index 7d75cb6..7319cd6 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java
@@ -69,8 +69,8 @@ public class RemoteParForSpark
 	//globally unique id for parfor spark job instances (unique across spark contexts)
 	private static final IDSequence _jobID = new IDSequence();
 	
-	public static RemoteParForJobReturn runJob(long pfid, String prog, HashMap<String, byte[]> clsMap, 
-			List<Task> tasks, ExecutionContext ec, ArrayList<ResultVar> resultVars, boolean cpCaching, int numMappers) 
+	public static RemoteParForJobReturn runJob(long pfid, String prog, HashMap<String, byte[]> clsMap, List<Task> tasks,
+		ExecutionContext ec, ArrayList<ResultVar> resultVars, boolean cpCaching, int numMappers, boolean topLevelPF)
 	{
 		String jobname = "ParFor-ESP";
 		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
@@ -96,7 +96,8 @@ public class RemoteParForSpark
 		//run remote_spark parfor job 
 		//(w/o lazy evaluation to fit existing parfor framework, e.g., result merge)
 		List<Tuple2<Long,String>> out = sc.parallelize(tasks, tasks.size()) //create rdd of parfor tasks
-			.flatMapToPair(new RemoteParForSparkWorker(jobid, prog, clsMap, cpCaching, aTasks, aIters, brInputs))
+			.flatMapToPair(new RemoteParForSparkWorker(jobid, prog,
+				clsMap, cpCaching, aTasks, aIters, brInputs, topLevelPF))
 			.collect(); //execute and get output handles
 		
 		//de-serialize results

http://git-wip-us.apache.org/repos/asf/systemml/blob/f393ebe3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
index b22f48d..ab29148 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
@@ -22,9 +22,11 @@ package org.apache.sysml.runtime.controlprogram.parfor;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.spark.TaskContext;
@@ -52,22 +54,24 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun
 	private final HashMap<String, byte[]> _clsMap;
 	private boolean _initialized = false;
 	private boolean _caching = true;
+	private final boolean _cleanCache;
 	
 	private final LongAccumulator _aTasks;
 	private final LongAccumulator _aIters;
 
 	private final Map<String, Broadcast<CacheBlock>> _brInputs;
 	
-	public RemoteParForSparkWorker(long jobid, String program, HashMap<String, byte[]> clsMap, boolean cpCaching, LongAccumulator atasks, LongAccumulator aiters, Map<String, Broadcast<CacheBlock>> brInputs) {
+	public RemoteParForSparkWorker(long jobid, String program, HashMap<String, byte[]> clsMap, boolean cpCaching,
+			LongAccumulator atasks, LongAccumulator aiters, Map<String, Broadcast<CacheBlock>> brInputs, boolean cleanCache) {
 		_jobid = jobid;
 		_prog = program;
 		_clsMap = clsMap;
 		_initialized = false;
 		_caching = cpCaching;
-		//setup spark accumulators
 		_aTasks = atasks;
 		_aIters = aiters;
 		_brInputs = brInputs;
+		_cleanCache = cleanCache;
 	}
 	
 	@Override 
@@ -78,6 +82,9 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun
 		if( !_initialized )
 			configureWorker(TaskContext.get().taskAttemptId());
 		
+		//keep input var names
+		Set<String> inVars = new HashSet<>(_ec.getVariables().keySet());
+		
 		//execute a single task
 		long numIter = getExecutedIterations();
 		super.executeTask( arg0 );
@@ -86,9 +93,15 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun
 		_aTasks.add( 1 );
 		_aIters.add( (int)(getExecutedIterations()-numIter) );
 		
-		//write output if required (matrix indexed write) 
+		//cleanup remaining intermediate variables from buffer pool
+		_ec.getVariables().keySet().stream().filter(v -> !inVars.contains(v))
+			.map(v -> _ec.getVariable(v)).filter(d -> d instanceof CacheableData)
+			.forEach(c -> ((CacheableData<?>)c).freeEvictedBlob());
+		
+		//write output if required (matrix indexed write), incl cleanup pinned vars
 		//note: this copy is necessary for environments without spark libraries
-		return RemoteParForUtils.exportResultVariables(_workerID, _ec.getVariables(), _resultVars)
+		return RemoteParForUtils
+			.exportResultVariables(_workerID, _ec.getVariables(), _resultVars)
 			.stream().map(s -> new Tuple2<>(_workerID, s)).iterator();
 	}
 	
@@ -111,10 +124,9 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun
 
 		//reuse shared inputs (to read shared inputs once per process instead of once per core; 
 		//we reuse everything except result variables and partitioned input matrices)
-		_ec.pinVariables(_ec.getVarList()); //avoid cleanup of shared inputs
 		Collection<String> blacklist = UtilFunctions.asSet(_resultVars.stream()
 			.map(v -> v._name).collect(Collectors.toList()), _ec.getVarListPartitioned());
-		reuseVars.reuseVariables(_jobid, _ec.getVariables(), blacklist, _brInputs);
+		reuseVars.reuseVariables(_jobid, _ec.getVariables(), blacklist, _brInputs, _cleanCache);
 		
 		//init and register-cleanup of buffer pool (in parfor spark, multiple tasks might 
 		//share the process-local, i.e., per executor, buffer pool; hence we synchronize 

http://git-wip-us.apache.org/repos/asf/systemml/blob/f393ebe3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
index e93f0ac..fb2adc2 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
@@ -174,6 +174,8 @@ public class RemoteParForUtils
 					//(only if actually exported, hence in check for dirty, otherwise potential problems in result merge)
 					ret.add( ProgramConverter.serializeDataObject(rvar._name, mo) );
 				}
+				//cleanup pinned result variable from buffer pool
+				mo.freeEvictedBlob();
 			}
 		}