You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2015/12/04 19:04:26 UTC
[5/5] incubator-systemml git commit: Improved robustness of broadcast
reuse (cleanup on demand via softref)
Improved robustness of broadcast reuse (cleanup on demand via softref)
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/5f058f08
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/5f058f08
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/5f058f08
Branch: refs/heads/master
Commit: 5f058f08d1db0b14cd431d85eaa17746b5cf99bb
Parents: 59d4a15
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Thu Dec 3 19:29:17 2015 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Thu Dec 3 19:29:17 2015 -0800
----------------------------------------------------------------------
.../context/SparkExecutionContext.java | 13 ++++++++-----
.../instructions/spark/data/BroadcastObject.java | 17 +++++++++++++----
.../spark/data/PartitionedBroadcastMatrix.java | 3 +--
3 files changed, 22 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/5f058f08/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index e4a5a5f..a7bb99f 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -354,13 +354,15 @@ public class SparkExecutionContext extends ExecutionContext
PartitionedBroadcastMatrix bret = null;
- if( mo.getBroadcastHandle()!=null
+ //reuse existing broadcast handle
+ if( mo.getBroadcastHandle()!=null
&& mo.getBroadcastHandle().isValid() )
{
- //reuse existing broadcast handle
bret = mo.getBroadcastHandle().getBroadcast();
}
- else
+
+ //create new broadcast handle (never created, evicted)
+ if( bret == null )
{
//obtain meta data for matrix
int brlen = (int) mo.getNumRowsPerBlock();
@@ -892,8 +894,9 @@ public class SparkExecutionContext extends ExecutionContext
}
else if( lob instanceof BroadcastObject ) {
PartitionedBroadcastMatrix pbm = ((BroadcastObject)lob).getBroadcast();
- for( Broadcast<PartitionedMatrixBlock> bc : pbm.getBroadcasts() )
- cleanupBroadcastVariable(bc);
+ if( pbm != null ) //robustness for evictions
+ for( Broadcast<PartitionedMatrixBlock> bc : pbm.getBroadcasts() )
+ cleanupBroadcastVariable(bc);
}
//recursively process lineage children
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/5f058f08/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java
index 2d306a7..da19319 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java
@@ -17,15 +17,18 @@
package org.apache.sysml.runtime.instructions.spark.data;
+import java.lang.ref.SoftReference;
+
import org.apache.spark.broadcast.Broadcast;
public class BroadcastObject extends LineageObject
{
- private PartitionedBroadcastMatrix _bcHandle = null;
+ //soft reference storage for graceful cleanup in case of memory pressure
+ private SoftReference<PartitionedBroadcastMatrix> _bcHandle = null;
public BroadcastObject( PartitionedBroadcastMatrix bvar, String varName )
{
- _bcHandle = bvar;
+ _bcHandle = new SoftReference<PartitionedBroadcastMatrix>(bvar);
_varName = varName;
}
@@ -35,7 +38,7 @@ public class BroadcastObject extends LineageObject
*/
public PartitionedBroadcastMatrix getBroadcast()
{
- return _bcHandle;
+ return _bcHandle.get();
}
/**
@@ -44,7 +47,13 @@ public class BroadcastObject extends LineageObject
*/
public boolean isValid()
{
- Broadcast<PartitionedMatrixBlock>[] tmp = _bcHandle.getBroadcasts();
+ //check for evicted soft reference
+ PartitionedBroadcastMatrix pbm = _bcHandle.get();
+ if( pbm == null )
+ return false;
+
+ //check for validity of individual broadcasts
+ Broadcast<PartitionedMatrixBlock>[] tmp = pbm.getBroadcasts();
for( Broadcast<PartitionedMatrixBlock> bc : tmp )
if( !bc.isValid() )
return false;
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/5f058f08/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcastMatrix.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcastMatrix.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcastMatrix.java
index 205ab80..1fbf605 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcastMatrix.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcastMatrix.java
@@ -36,8 +36,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock;
public class PartitionedBroadcastMatrix implements Serializable
{
private static final long serialVersionUID = 1225135967889810877L;
-
- private static long BROADCAST_PARTSIZE = 200L*1024*1024; //200M cells ~ 1.6GB
+ private static final long BROADCAST_PARTSIZE = 200L*1024*1024; //200M cells ~ 1.6GB
private Broadcast<PartitionedMatrixBlock>[] _pbc = null;