You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/11/21 07:54:10 UTC

[2/2] systemml git commit: [SYSTEMML-2026] Fix OOM on codegen spark operations w/ broadcasts

[SYSTEMML-2026] Fix OOM on codegen spark operations w/ broadcasts

This patch fixes an OOM issues encountered on perftest stratstats data
generation for the 80GB scenario w/ enabled code generation. In detail,
all codegen spark operations now take already existing broadcasts
(stored in blockified form in the driver block manager) and correct,
i.e., partitioned, broadcast sizes into account.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/20f97e0b
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/20f97e0b
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/20f97e0b

Branch: refs/heads/master
Commit: 20f97e0b5159aef1289b9a8f58316c3b2067ea7d
Parents: 5491c9d
Author: Matthias Boehm <mb...@gmail.com>
Authored: Mon Nov 20 23:15:49 2017 -0800
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Mon Nov 20 23:54:19 2017 -0800

----------------------------------------------------------------------
 .../controlprogram/caching/CacheableData.java   | 30 ++++++++------------
 .../instructions/spark/SpoofSPInstruction.java  | 11 ++++---
 2 files changed, 19 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/20f97e0b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
index 9e787de..be7b9b9 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
@@ -116,26 +116,26 @@ public abstract class CacheableData<T extends CacheBlock> extends Data
 	private static IDSequence _seq = null;   
 
 	// Global eviction path and prefix (prefix used for isolation purposes)
-    public static String cacheEvictionLocalFilePath = null; //set during init
-    public static String cacheEvictionLocalFilePrefix = "cache";
+	public static String cacheEvictionLocalFilePath = null; //set during init
+	public static String cacheEvictionLocalFilePrefix = "cache";
 
 	/**
 	 * Current state of pinned variables, required for guarded collect.
 	 */
 	private static ThreadLocal<Long> sizePinned = new ThreadLocal<Long>() {
-        @Override protected Long initialValue() { return 0L; }
-    };
+		@Override protected Long initialValue() { return 0L; }
+	};
 
 	//current size of live broadcast objects (because Spark's ContextCleaner maintains 
 	//a buffer with references to prevent eager cleanup by GC); note that this is an 
 	//overestimate, because we maintain partitioned broadcasts as soft references, which 
 	//might be collected by the GC and subsequently cleaned up by Spark's ContextCleaner.
-	private static AtomicLong _refBCs = new AtomicLong(0);	
-    
+	private static final AtomicLong _refBCs = new AtomicLong(0);
+
 	static {
 		_seq = new IDSequence();
 	}
-		
+
 	/**
 	 * The unique (JVM-wide) ID of a cacheable data object; to ensure unique IDs across JVMs, we
 	 * concatenate filenames with a unique prefix (map task ID). 
@@ -651,8 +651,6 @@ public abstract class CacheableData<T extends CacheBlock> extends Data
 		}
 	}
 	
-	protected void clearReusableData() {}
-	
 	/**
 	 * Sets the cache block reference to <code>null</code>, abandons the old block.
 	 * Makes the "envelope" empty.  Run it to finalize the object (otherwise the
@@ -682,8 +680,7 @@ public abstract class CacheableData<T extends CacheBlock> extends Data
 			freeEvictedBlob();	
 		
 		// clear the in-memory data
-		clearReusableData();
-		_data = null;	
+		_data = null;
 		clearCache();
 		
 		// clear rdd/broadcast back refs
@@ -691,13 +688,10 @@ public abstract class CacheableData<T extends CacheBlock> extends Data
 			_rddHandle.setBackReference(null);
 		if( _bcHandle != null )
 			_bcHandle.setBackReference(null);
-		if( _gpuObjects != null ) {
-		    for (GPUObject gObj : _gpuObjects.values()){
-		        if (gObj != null) {
-                    gObj.clearData();
-                }
-            }
-        }
+		if( _gpuObjects != null )
+			for (GPUObject gObj : _gpuObjects.values())
+				if (gObj != null)
+					gObj.clearData();
 
 		// change object state EMPTY
 		setDirty(false);

http://git-wip-us.apache.org/repos/asf/systemml/blob/20f97e0b/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
index 2f3eedb..cb3ad14 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
@@ -47,6 +47,7 @@ import org.apache.sysml.runtime.codegen.SpoofOuterProduct;
 import org.apache.sysml.runtime.codegen.SpoofOuterProduct.OutProdType;
 import org.apache.sysml.runtime.codegen.SpoofRowwise;
 import org.apache.sysml.runtime.codegen.SpoofRowwise.RowType;
+import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.functionobjects.Builtin;
@@ -253,7 +254,8 @@ public class SpoofSPInstruction extends SPInstruction {
 		throws DMLRuntimeException 
 	{
 		boolean[] ret = new boolean[inputs.length];
-		double localBudget = OptimizerUtils.getLocalMemBudget();
+		double localBudget = OptimizerUtils.getLocalMemBudget()
+			- CacheableData.getBroadcastSize(); //account for other broadcasts
 		double bcBudget = SparkExecutionContext.getBroadcastMemoryBudget();
 		
 		//decided for each matrix input if it fits into remaining memory
@@ -263,9 +265,10 @@ public class SpoofSPInstruction extends SPInstruction {
 				MatrixCharacteristics mc = sec.getMatrixCharacteristics(inputs[i].getName());
 				double sizeL = OptimizerUtils.estimateSizeExactSparsity(mc);
 				double sizeP = OptimizerUtils.estimatePartitionedSizeExactSparsity(mc);
-				ret[i] = localBudget > sizeL && bcBudget > sizeP;
-				localBudget -= ret[i] ? sizeL : 0;
-				bcBudget -= ret[i] ? sizeP : 0;
+				//account for partitioning and local/remote budgets
+				ret[i] = localBudget > (sizeL + sizeP) && bcBudget > sizeP;
+				localBudget -= ret[i] ? sizeP : 0; //in local block manager
+				bcBudget -= ret[i] ? sizeP : 0; //in remote block managers
 			}
 		
 		return ret;