You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ar...@apache.org on 2020/11/06 14:26:45 UTC

[systemds] branch master updated: [MINOR] Minior fixes in lineage cache eviction.

This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new e2414e0  [MINOR] Minior fixes in lineage cache eviction.
e2414e0 is described below

commit e2414e039d64df6e0d547e4902a9c9770aed5b5d
Author: arnabp <ar...@tugraz.at>
AuthorDate: Fri Nov 6 15:16:52 2020 +0100

    [MINOR] Minior fixes in lineage cache eviction.
    
    This patch removes a bug where we were making space after
    putting an entry in the cache -- this can evict the current
    entry and introduce memory leak. Now we make space before
    adding an entry to cache. Furthermore, this patch increases
    the default I/O speed values to enable early starting of
    spilling. The rolling adjustment logic anyway updates
    the speed values according to the disk.
---
 .../apache/sysds/runtime/lineage/LineageCache.java | 30 ++++++++++++++--------
 .../sysds/runtime/lineage/LineageCacheConfig.java  | 11 +++++---
 .../runtime/lineage/LineageCacheEviction.java      | 13 +++++-----
 3 files changed, 32 insertions(+), 22 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index e3cb62b..71d55e3 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -277,18 +277,18 @@ public class LineageCache
 					LineageItem item = entry.getKey();
 					Data data = entry.getValue();
 					LineageCacheEntry centry = _cache.get(item);
-					if (data instanceof MatrixObject)
-						centry.setValue(((MatrixObject)data).acquireReadAndRelease(), computetime);
-					else if (data instanceof ScalarObject)
-						centry.setValue((ScalarObject)data, computetime);
-					else {
+
+					if (!(data instanceof MatrixObject) && !(data instanceof ScalarObject)) {
 						// Reusable instructions can return a frame (rightIndex). Remove placeholders.
 						_cache.remove(item);
 						continue;
 					}
 
-					long size = centry.getSize();
-					//remove the entry if the entry is bigger than the cache.
+					MatrixBlock mb = (data instanceof MatrixObject) ? 
+							((MatrixObject)data).acquireReadAndRelease() : null;
+					long size = mb != null ? mb.getInMemorySize() : ((ScalarObject)data).getSize();
+
+					//remove the placeholder if the entry is bigger than the cache.
 					//FIXME: the resumed threads will enter into infinite wait as the entry
 					//is removed. Need to add support for graceful remove (placeholder) and resume.
 					if (size > LineageCacheEviction.getCacheLimit()) {
@@ -296,12 +296,20 @@ public class LineageCache
 						continue; 
 					}
 
-					//maintain order for eviction
-					LineageCacheEviction.addEntry(centry);
-
+					//make space for the data
 					if (!LineageCacheEviction.isBelowThreshold(size))
 						LineageCacheEviction.makeSpace(_cache, size);
 					LineageCacheEviction.updateSize(size, true);
+
+					//place the data
+					if (data instanceof MatrixObject)
+						centry.setValue(mb, computetime);
+					else if (data instanceof ScalarObject)
+						centry.setValue((ScalarObject)data, computetime);
+
+					//maintain order for eviction
+					LineageCacheEviction.addEntry(centry);
+
 				}
 			}
 		}
@@ -358,7 +366,7 @@ public class LineageCache
 		// Create a new entry.
 		LineageCacheEntry newItem = new LineageCacheEntry(key, dt, Mval, Sval, computetime);
 		
-		// Make space by removing or spilling LRU entries.
+		// Make space by removing or spilling entries.
 		if( Mval != null || Sval != null ) {
 			long size = newItem.getSize();
 			if( size > LineageCacheEviction.getCacheLimit())
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
index 66972c4..6a235b6 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
@@ -79,10 +79,13 @@ public class LineageCacheConfig
 	// Minimum reliable data size for spilling estimate in MB.
 	public static final double MIN_SPILL_DATA = 2;
 	// Default I/O in MB per second for binary blocks
-	public static double FSREAD_DENSE = 200;
-	public static double FSREAD_SPARSE = 100;
-	public static double FSWRITE_DENSE = 150;
-	public static double FSWRITE_SPARSE = 75;
+	// NOTE: These defaults are tuned according to high
+	// speed disks, so that spilling starts early. These 
+	// will anyway be adjusted as per the current disk.
+	public static double FSREAD_DENSE = 500;
+	public static double FSREAD_SPARSE = 400;
+	public static double FSWRITE_DENSE = 450;
+	public static double FSWRITE_SPARSE = 225;
 	
 	private enum CachedItemHead {
 		TSMM,
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
index 553ca03..7025818 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
@@ -213,13 +213,12 @@ public class LineageCacheEviction
 			double exectime = ((double) e._computeTime) / 1000000; // in milliseconds
 
 			if (LineageCache.DEBUG) {
-				if (exectime > LineageCacheConfig.MIN_SPILL_TIME_ESTIMATE) {
-					System.out.print("LI " + e._key.getOpcode());
-					System.out.print(" exec time " + ((double) e._computeTime) / 1000000);
-					System.out.print(" spill time " + getDiskSpillEstimate(e) * 1000);
-					System.out.print(" dim " + e.getMBValue().getNumRows() + " " + e.getMBValue().getNumColumns());
-					System.out.println(" size " + getDiskSizeEstimate(e));
-				}
+				System.out.print("LI = " + e._key.getOpcode());
+				System.out.print(" exec time = " + ((double) e._computeTime) / 1000000);
+				System.out.println(" spill time = " + getDiskSpillEstimate(e) * 1000);
+				System.out.print("dim = " + e.getMBValue().getNumRows() + " " + e.getMBValue().getNumColumns());
+				System.out.print(" size = " + getDiskSizeEstimate(e));
+				System.out.println(" DAG height = " + e._key.getDistLeaf2Node());
 			}
 
 			if (spilltime < LineageCacheConfig.MIN_SPILL_TIME_ESTIMATE) {