You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2015/12/04 19:04:26 UTC

[5/5] incubator-systemml git commit: Improved robustness of broadcast reuse (cleanup on demand via softref)

Improved robustness of broadcast reuse (cleanup on demand via softref)

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/5f058f08
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/5f058f08
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/5f058f08

Branch: refs/heads/master
Commit: 5f058f08d1db0b14cd431d85eaa17746b5cf99bb
Parents: 59d4a15
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Thu Dec 3 19:29:17 2015 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Thu Dec 3 19:29:17 2015 -0800

----------------------------------------------------------------------
 .../context/SparkExecutionContext.java             | 13 ++++++++-----
 .../instructions/spark/data/BroadcastObject.java   | 17 +++++++++++++----
 .../spark/data/PartitionedBroadcastMatrix.java     |  3 +--
 3 files changed, 22 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/5f058f08/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index e4a5a5f..a7bb99f 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -354,13 +354,15 @@ public class SparkExecutionContext extends ExecutionContext
 		
 		PartitionedBroadcastMatrix bret = null;
 		
-		if(    mo.getBroadcastHandle()!=null 
+		//reuse existing broadcast handle
+		if( mo.getBroadcastHandle()!=null 
 			&& mo.getBroadcastHandle().isValid() ) 
 		{
-			//reuse existing broadcast handle
 			bret = mo.getBroadcastHandle().getBroadcast();
 		}
-		else 
+		
+		//create new broadcast handle (never created, evicted)
+		if( bret == null ) 
 		{
 			//obtain meta data for matrix 
 			int brlen = (int) mo.getNumRowsPerBlock();
@@ -892,8 +894,9 @@ public class SparkExecutionContext extends ExecutionContext
 		}
 		else if( lob instanceof BroadcastObject ) {
 			PartitionedBroadcastMatrix pbm = ((BroadcastObject)lob).getBroadcast();
-			for( Broadcast<PartitionedMatrixBlock> bc : pbm.getBroadcasts() )
-				cleanupBroadcastVariable(bc);
+			if( pbm != null ) //robustness for evictions
+				for( Broadcast<PartitionedMatrixBlock> bc : pbm.getBroadcasts() )
+					cleanupBroadcastVariable(bc);
 		}
 	
 		//recursively process lineage children

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/5f058f08/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java
index 2d306a7..da19319 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java
@@ -17,15 +17,18 @@
 
 package org.apache.sysml.runtime.instructions.spark.data;
 
+import java.lang.ref.SoftReference;
+
 import org.apache.spark.broadcast.Broadcast;
 
 public class BroadcastObject extends LineageObject
 {
-	private PartitionedBroadcastMatrix _bcHandle = null;
+	//soft reference storage for graceful cleanup in case of memory pressure
+	private SoftReference<PartitionedBroadcastMatrix> _bcHandle = null;
 	
 	public BroadcastObject( PartitionedBroadcastMatrix bvar, String varName )
 	{
-		_bcHandle = bvar;
+		_bcHandle = new SoftReference<PartitionedBroadcastMatrix>(bvar);
 		_varName = varName;
 	}
 	
@@ -35,7 +38,7 @@ public class BroadcastObject extends LineageObject
 	 */
 	public PartitionedBroadcastMatrix getBroadcast()
 	{
-		return _bcHandle;
+		return _bcHandle.get();
 	}
 	
 	/**
@@ -44,7 +47,13 @@ public class BroadcastObject extends LineageObject
 	 */
 	public boolean isValid() 
 	{
-		Broadcast<PartitionedMatrixBlock>[] tmp = _bcHandle.getBroadcasts();
+		//check for evicted soft reference
+		PartitionedBroadcastMatrix pbm = _bcHandle.get();
+		if( pbm == null )
+			return false;
+		
+		//check for validity of individual broadcasts
+		Broadcast<PartitionedMatrixBlock>[] tmp = pbm.getBroadcasts();
 		for( Broadcast<PartitionedMatrixBlock> bc : tmp )
 			if( !bc.isValid() )
 				return false;		

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/5f058f08/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcastMatrix.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcastMatrix.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcastMatrix.java
index 205ab80..1fbf605 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcastMatrix.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcastMatrix.java
@@ -36,8 +36,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 public class PartitionedBroadcastMatrix implements Serializable
 {
 	private static final long serialVersionUID = 1225135967889810877L;
-
-	private static long BROADCAST_PARTSIZE = 200L*1024*1024; //200M cells ~ 1.6GB 
+	private static final long BROADCAST_PARTSIZE = 200L*1024*1024; //200M cells ~ 1.6GB 
 	
 	private Broadcast<PartitionedMatrixBlock>[] _pbc = null;