You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by lr...@apache.org on 2015/11/19 21:47:22 UTC
[40/50] [abbrv] incubator-systemml git commit: Improved guarded rdd
collect (thread-local tracking of pinned memory)
Improved guarded rdd collect (thread-local tracking of pinned memory)
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/e43735a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/e43735a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/e43735a6
Branch: refs/heads/master
Commit: e43735a6ef5130ef3c068cd7dac124e34d4f0b4e
Parents: d21b1c3
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Mon Nov 9 18:47:27 2015 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Mon Nov 9 18:47:27 2015 -0800
----------------------------------------------------------------------
.../com/ibm/bi/dml/hops/OptimizerUtils.java | 4 +--
.../controlprogram/caching/MatrixObject.java | 31 ++++++++++++++++----
.../bi/dml/runtime/matrix/data/MatrixBlock.java | 14 +++++++++
3 files changed, 42 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e43735a6/src/main/java/com/ibm/bi/dml/hops/OptimizerUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/hops/OptimizerUtils.java b/src/main/java/com/ibm/bi/dml/hops/OptimizerUtils.java
index 987952c..0c76425 100644
--- a/src/main/java/com/ibm/bi/dml/hops/OptimizerUtils.java
+++ b/src/main/java/com/ibm/bi/dml/hops/OptimizerUtils.java
@@ -475,7 +475,7 @@ public class OptimizerUtils
* @param nnz
* @return
*/
- public static boolean checkSparkCollectMemoryBudget( long rlen, long clen, int brlen, int bclen, long nnz )
+ public static boolean checkSparkCollectMemoryBudget( long rlen, long clen, int brlen, int bclen, long nnz, long memPinned )
{
//compute size of output matrix and its blocked representation
double sp = getSparsity(rlen, clen, nnz);
@@ -483,7 +483,7 @@ public class OptimizerUtils
double memPMatrix = estimatePartitionedSizeExactSparsity(rlen, clen, brlen, bclen, sp);
//check if both output matrix and partitioned matrix fit into local mem budget
- return (memMatrix + memPMatrix < getLocalMemBudget());
+ return (memPinned + memMatrix + memPMatrix < getLocalMemBudget());
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e43735a6/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/MatrixObject.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/MatrixObject.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/MatrixObject.java
index 06d224b..839a830 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/MatrixObject.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/MatrixObject.java
@@ -61,10 +61,16 @@ import com.ibm.bi.dml.runtime.util.MapReduceTool;
*/
public class MatrixObject extends CacheableData
{
-
private static final long serialVersionUID = 6374712373206495637L;
/**
+ * Current state of pinned variables, required for guarded collect.
+ */
+ private static ThreadLocal<Long> sizePinned = new ThreadLocal<Long>() {
+ @Override protected Long initialValue() { return 0L; }
+ };
+
+ /**
* Cache for actual data, evicted by garbage collector.
*/
private SoftReference<MatrixBlock> _cache = null;
@@ -121,7 +127,6 @@ public class MatrixObject extends CacheableData
private BroadcastObject _bcHandle = null; //Broadcast handle
private RDDProperties _rddProperties = null;
-
/**
* Information relevant to partitioned matrices.
*/
@@ -523,6 +528,7 @@ public class MatrixObject extends CacheableData
//cache status maintenance
super.acquire( false, _data==null );
+ updateStatusPinned(true);
if( DMLScript.STATISTICS ){
long t1 = System.nanoTime();
@@ -577,6 +583,7 @@ public class MatrixObject extends CacheableData
//cache status maintenance
super.acquire( true, _data==null );
+ updateStatusPinned(true);
_dirtyFlag = true;
_isAcquireFromEmpty = false;
@@ -622,6 +629,7 @@ public class MatrixObject extends CacheableData
if (newData == null)
throw new CacheException("acquireModify with empty matrix block.");
_data = newData;
+ updateStatusPinned(true);
if( DMLScript.STATISTICS ){
long t1 = System.nanoTime();
@@ -667,7 +675,8 @@ public class MatrixObject extends CacheableData
//cache status maintenance (pass cacheNoWrite flag)
super.release(_isAcquireFromEmpty && !_requiresLocalWrite);
-
+ updateStatusPinned(false);
+
if( isCachingActive() //only if caching is enabled (otherwise keep everything in mem)
&& isCached(true) //not empty and not read/modify
&& !isUpdateInPlace() //pinned result variable
@@ -694,7 +703,7 @@ public class MatrixObject extends CacheableData
else if( LOG.isTraceEnabled() ){
LOG.trace("Var "+_varName+" not subject to caching: rows="+_data.getNumRows()+", cols="+_data.getNumColumns()+", state="+getStatusAsString());
}
-
+
if( DMLScript.STATISTICS ){
long t1 = System.nanoTime();
CacheStatistics.incrementReleaseTime(t1-t0);
@@ -1351,7 +1360,7 @@ public class MatrixObject extends CacheableData
long nnz = mc.getNonZeros();
//guarded rdd collect
- if( !OptimizerUtils.checkSparkCollectMemoryBudget(rlen, clen, brlen, bclen, nnz) ) {
+ if( !OptimizerUtils.checkSparkCollectMemoryBudget(rlen, clen, brlen, bclen, nnz, sizePinned.get()) ) {
//write RDD to hdfs and read to prevent invalid collect mem consumption
//note: lazy, partition-at-a-time collect (toLocalIterator) was significantly slower
if( !MapReduceTool.existsFileOnHDFS(_hdfsFileName) ) { //prevent overwrite existing file
@@ -1616,6 +1625,18 @@ public class MatrixObject extends CacheableData
}
/**
+ *
+ * @param add
+ */
+ private void updateStatusPinned(boolean add) {
+ if( _data != null ) { //data should never be null
+ long size = sizePinned.get();
+ size += (add ? 1 : -1) * _data.getSizeInMemory();
+ sizePinned.set( Math.max(size,0) );
+ }
+ }
+
+ /**
* see clear data
*
* @param flag
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e43735a6/src/main/java/com/ibm/bi/dml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/matrix/data/MatrixBlock.java b/src/main/java/com/ibm/bi/dml/runtime/matrix/data/MatrixBlock.java
index 8f948bd..594c75a 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/matrix/data/MatrixBlock.java
@@ -2663,6 +2663,20 @@ public class MatrixBlock extends MatrixValue implements Externalizable
// Estimates size and sparsity
/**
+ * Estimate size based on current sparse/dense representation.
+ *
+ * @return
+ */
+ public long getSizeInMemory()
+ {
+ double sp = OptimizerUtils.getSparsity(rlen, clen, nonZeros);
+ if( sparse )
+ return estimateSizeSparseInMemory(rlen, clen, sp);
+ else
+ return estimateSizeDenseInMemory(rlen, clen);
+ }
+
+ /**
*
* @return
*/