You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by lr...@apache.org on 2015/11/19 21:47:22 UTC

[40/50] [abbrv] incubator-systemml git commit: Improved guarded rdd collect (thread-local tracking of pinned memory)

Improved guarded rdd collect (thread-local tracking of pinned memory)

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/e43735a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/e43735a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/e43735a6

Branch: refs/heads/master
Commit: e43735a6ef5130ef3c068cd7dac124e34d4f0b4e
Parents: d21b1c3
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Mon Nov 9 18:47:27 2015 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Mon Nov 9 18:47:27 2015 -0800

----------------------------------------------------------------------
 .../com/ibm/bi/dml/hops/OptimizerUtils.java     |  4 +--
 .../controlprogram/caching/MatrixObject.java    | 31 ++++++++++++++++----
 .../bi/dml/runtime/matrix/data/MatrixBlock.java | 14 +++++++++
 3 files changed, 42 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e43735a6/src/main/java/com/ibm/bi/dml/hops/OptimizerUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/hops/OptimizerUtils.java b/src/main/java/com/ibm/bi/dml/hops/OptimizerUtils.java
index 987952c..0c76425 100644
--- a/src/main/java/com/ibm/bi/dml/hops/OptimizerUtils.java
+++ b/src/main/java/com/ibm/bi/dml/hops/OptimizerUtils.java
@@ -475,7 +475,7 @@ public class OptimizerUtils
 	 * @param nnz
 	 * @return
 	 */
-	public static boolean checkSparkCollectMemoryBudget( long rlen, long clen, int brlen, int bclen, long nnz )
+	public static boolean checkSparkCollectMemoryBudget( long rlen, long clen, int brlen, int bclen, long nnz, long memPinned )
 	{
 		//compute size of output matrix and its blocked representation
 		double sp = getSparsity(rlen, clen, nnz);
@@ -483,7 +483,7 @@ public class OptimizerUtils
 		double memPMatrix = estimatePartitionedSizeExactSparsity(rlen, clen, brlen, bclen, sp);
 		
 		//check if both output matrix and partitioned matrix fit into local mem budget
-		return (memMatrix + memPMatrix < getLocalMemBudget());
+		return (memPinned + memMatrix + memPMatrix < getLocalMemBudget());
 	}
 	
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e43735a6/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/MatrixObject.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/MatrixObject.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/MatrixObject.java
index 06d224b..839a830 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/MatrixObject.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/MatrixObject.java
@@ -61,10 +61,16 @@ import com.ibm.bi.dml.runtime.util.MapReduceTool;
  */
 public class MatrixObject extends CacheableData
 {
-
 	private static final long serialVersionUID = 6374712373206495637L;
 
 	/**
+	 * Current state of pinned variables, required for guarded collect.
+	 */
+	private static ThreadLocal<Long> sizePinned = new ThreadLocal<Long>() {
+        @Override protected Long initialValue() { return 0L; }
+    };
+	
+	/**
 	 * Cache for actual data, evicted by garbage collector.
 	 */
 	private SoftReference<MatrixBlock> _cache = null;
@@ -121,7 +127,6 @@ public class MatrixObject extends CacheableData
 	private BroadcastObject _bcHandle = null; //Broadcast handle
 	private RDDProperties _rddProperties = null;
 	
-	
 	/**
 	 * Information relevant to partitioned matrices.
 	 */
@@ -523,6 +528,7 @@ public class MatrixObject extends CacheableData
 		
 		//cache status maintenance
 		super.acquire( false, _data==null );	
+		updateStatusPinned(true);
 		
 		if( DMLScript.STATISTICS ){
 			long t1 = System.nanoTime();
@@ -577,6 +583,7 @@ public class MatrixObject extends CacheableData
 
 		//cache status maintenance
 		super.acquire( true, _data==null );
+		updateStatusPinned(true);
 		_dirtyFlag = true;
 		_isAcquireFromEmpty = false;
 		
@@ -622,6 +629,7 @@ public class MatrixObject extends CacheableData
 		if (newData == null)
 			throw new CacheException("acquireModify with empty matrix block.");
 		_data = newData; 
+		updateStatusPinned(true);
 		
 		if( DMLScript.STATISTICS ){
 			long t1 = System.nanoTime();
@@ -667,7 +675,8 @@ public class MatrixObject extends CacheableData
 		
 		//cache status maintenance (pass cacheNoWrite flag)
 		super.release(_isAcquireFromEmpty && !_requiresLocalWrite);
-
+		updateStatusPinned(false);
+		
 		if(    isCachingActive() //only if caching is enabled (otherwise keep everything in mem)
 			&& isCached(true)    //not empty and not read/modify
 			&& !isUpdateInPlace()        //pinned result variable
@@ -694,7 +703,7 @@ public class MatrixObject extends CacheableData
 		else if( LOG.isTraceEnabled() ){
 			LOG.trace("Var "+_varName+" not subject to caching: rows="+_data.getNumRows()+", cols="+_data.getNumColumns()+", state="+getStatusAsString());
 		}
-		
+
 		if( DMLScript.STATISTICS ){
 			long t1 = System.nanoTime();
 			CacheStatistics.incrementReleaseTime(t1-t0);
@@ -1351,7 +1360,7 @@ public class MatrixObject extends CacheableData
 			long nnz = mc.getNonZeros();
 			
 			//guarded rdd collect 
-			if( !OptimizerUtils.checkSparkCollectMemoryBudget(rlen, clen, brlen, bclen, nnz) ) {
+			if( !OptimizerUtils.checkSparkCollectMemoryBudget(rlen, clen, brlen, bclen, nnz, sizePinned.get()) ) {
 				//write RDD to hdfs and read to prevent invalid collect mem consumption 
 				//note: lazy, partition-at-a-time collect (toLocalIterator) was significantly slower
 				if( !MapReduceTool.existsFileOnHDFS(_hdfsFileName) ) { //prevent overwrite existing file
@@ -1616,6 +1625,18 @@ public class MatrixObject extends CacheableData
 	}
 	
 	/**
+	 * 
+	 * @param add
+	 */
+	private void updateStatusPinned(boolean add) {
+		if( _data != null ) { //data should never be null
+			long size = sizePinned.get();
+			size += (add ? 1 : -1) * _data.getSizeInMemory();
+			sizePinned.set( Math.max(size,0) );
+		}
+	}
+	
+	/**
 	 * see clear data
 	 * 
 	 * @param flag

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e43735a6/src/main/java/com/ibm/bi/dml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/matrix/data/MatrixBlock.java b/src/main/java/com/ibm/bi/dml/runtime/matrix/data/MatrixBlock.java
index 8f948bd..594c75a 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/matrix/data/MatrixBlock.java
@@ -2663,6 +2663,20 @@ public class MatrixBlock extends MatrixValue implements Externalizable
 	// Estimates size and sparsity
 	
 	/**
+	 * Estimate size based on current sparse/dense representation.
+	 * 
+	 * @return
+	 */
+	public long getSizeInMemory() 
+	{
+		double sp = OptimizerUtils.getSparsity(rlen, clen, nonZeros);
+		if( sparse )
+			return estimateSizeSparseInMemory(rlen, clen, sp);
+		else
+			return estimateSizeDenseInMemory(rlen, clen);
+	}
+	
+	/**
 	 * 
 	 * @return
 	 */