You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2020/06/01 20:57:39 UTC

[systemml] branch master updated: [SYSTEMDS-411] Efficient multi-level lineage cache management

This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemml.git


The following commit(s) were added to refs/heads/master by this push:
     new 722eaf6  [SYSTEMDS-411] Efficient multi-level lineage cache management
722eaf6 is described below

commit 722eaf6eb13efade8e80552b9ef07f611e6ddc1a
Author: arnabp <ar...@tugraz.at>
AuthorDate: Mon Jun 1 22:54:02 2020 +0200

    [SYSTEMDS-411] Efficient multi-level lineage cache management
    
    This patch improves the handling of multiple cache entries pointing to
    the same data (due to multilevel caching).
    
    1) All the entries with the same values are connected with a linkedlist.
    Even though they output same data, they have different computation time.
    
    2) Eviction logic marks an entry for deferred spilling/removal if other
     entries are linked to that. If all the entries in a list are marked for
     spilling or removal, only then we evict the item.
    
    3) Disk write and read happen only once for all the items connected to a
     single matrix. This way single read and write restores multiple entries
     to cache and clears more space respectively.
    
    4) Initial experiments show huge improvements in cache management. Now
     the cache can store many more entries (this patch fixes duplicate size
     calculations), need reduced number of disk I/O. These changes overall
     improve cache hit count.
    
    Closes #932.
---
 dev/docs/Tasks.txt                                 |   3 +
 .../apache/sysds/runtime/lineage/LineageCache.java |  22 ++--
 .../sysds/runtime/lineage/LineageCacheConfig.java  |   9 +-
 .../sysds/runtime/lineage/LineageCacheEntry.java   |  12 ++
 .../runtime/lineage/LineageCacheEviction.java      | 137 ++++++++++++++++-----
 5 files changed, 139 insertions(+), 44 deletions(-)

diff --git a/dev/docs/Tasks.txt b/dev/docs/Tasks.txt
index 6b5dbb0..9a51eb5 100644
--- a/dev/docs/Tasks.txt
+++ b/dev/docs/Tasks.txt
@@ -314,5 +314,8 @@ SYSTEMDS-390 New Builtin Functions IV
 SYSTEMDS-400 Spark Backend Improvements
  * 401 Fix output block indexes of rdiag (diagM2V)                    OK
 
+SYSTEMDS-410 Lineage Tracing, Reuse and Integration II
+ * 411 Improved handling of multi-level cache duplicates              OK 
+
 Others:
  * Break append instruction to cbind and rbind 
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index ac54b70..ca6b349 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -39,6 +39,7 @@ import org.apache.sysds.runtime.instructions.cp.Data;
 import org.apache.sysds.runtime.instructions.cp.MMTSJCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.ParameterizedBuiltinCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.ScalarObject;
+import org.apache.sysds.runtime.lineage.LineageCacheConfig.LineageCacheStatus;
 import org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.meta.MetaDataFormat;
@@ -255,7 +256,7 @@ public class LineageCache
 			LineageItem boundLI = ec.getLineage().get(boundVarName);
 			if (boundLI != null)
 				boundLI.resetVisitStatus();
-			if (boundLI == null || !LineageCache.probe(li)) {
+			if (boundLI == null || !LineageCache.probe(li) || !LineageCache.probe(boundLI)) {
 				AllOutputsCacheable = false;
 			}
 			FuncLIMap.put(li, boundLI);
@@ -282,7 +283,7 @@ public class LineageCache
 	
 	//----------------- INTERNAL CACHE LOGIC IMPLEMENTATION --------------//
 	
-	protected static void putIntern(LineageItem key, DataType dt, MatrixBlock Mval, ScalarObject Sval, long computetime) {
+	private static void putIntern(LineageItem key, DataType dt, MatrixBlock Mval, ScalarObject Sval, long computetime) {
 		if (_cache.containsKey(key))
 			//can come here if reuse_partial option is enabled
 			return;
@@ -300,7 +301,7 @@ public class LineageCache
 			LineageCacheEviction.updateSize(size, true);
 		}
 		
-		// Place the entry at head position.
+		// Place the entry in the weighted queue.
 		LineageCacheEviction.addEntry(newItem);
 		
 		_cache.put(key, newItem);
@@ -310,8 +311,7 @@ public class LineageCache
 	
 	private static LineageCacheEntry getIntern(LineageItem key) {
 		// This method is called only when entry is present either in cache or in local FS.
-		if (_cache.containsKey(key)) {
-			// Read and put the entry at head.
+		if (_cache.containsKey(key) && _cache.get(key).getCacheStatus() != LineageCacheStatus.SPILLED) {
 			LineageCacheEntry e = _cache.get(key);
 			// Maintain order for eviction
 			LineageCacheEviction.getEntry(e);
@@ -336,14 +336,16 @@ public class LineageCache
 			else
 				e.setValue(oe.getSOValue(), computetime);
 			e._origItem = probeItem; 
+			// Add the SB/func entry to the end of the list of items pointing to the same data.
+			// No cache size update is necessary.
+			LineageCacheEntry tmp = oe;
+			// Maintain _origItem as head.
+			while (tmp._nextEntry != null)
+				tmp = tmp._nextEntry;
+			tmp._nextEntry = e;
 			
 			//maintain order for eviction
 			LineageCacheEviction.addEntry(e);
-
-			long size = oe.getSize();
-			if(!LineageCacheEviction.isBelowThreshold(size)) 
-				LineageCacheEviction.makeSpace(_cache, size);
-			LineageCacheEviction.updateSize(size, true);
 		}
 		else
 			_cache.remove(item);    //remove the placeholder
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
index 2a3c426..7fce53b 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
@@ -33,8 +33,9 @@ public class LineageCacheConfig
 	//-------------CACHING LOGIC RELATED CONFIGURATIONS--------------//
 
 	private static final String[] REUSE_OPCODES = new String[] {
-		"tsmm", "ba+*", "*", "/", "+", "nrow", "ncol", "round", "exp", "log",
+		"tsmm", "ba+*", "*", "/", "+", "||", "nrow", "ncol", "round", "exp", "log",
 		"rightIndex", "leftIndex", "groupedagg", "r'", "solve", "spoof"
+		//TODO: Reuse everything. 
 	};
 	
 	public enum ReuseCacheType {
@@ -97,9 +98,11 @@ public class LineageCacheConfig
 	protected enum LineageCacheStatus {
 		EMPTY,     //Placeholder with no data. Cannot be evicted.
 		CACHED,    //General cached data. Can be evicted.
-		EVICTED,   //Data is in disk. Empty value. Cannot be evicted.
+		SPILLED,   //Data is in disk. Empty value. Cannot be evicted.
 		RELOADED,  //Reloaded from disk. Can be evicted.
-		PINNED;    //Pinned to memory. Cannot be evicted.
+		PINNED,    //Pinned to memory. Cannot be evicted.
+		TOSPILL,   //To be spilled lazily 
+		TODELETE;  //TO be removed lazily
 		public boolean canEvict() {
 			return this == CACHED || this == RELOADED;
 		}
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
index 485cac6..256d85f 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
@@ -33,6 +33,7 @@ public class LineageCacheEntry {
 	protected long _computeTime;
 	protected long _timestamp = 0;
 	protected LineageCacheStatus _status;
+	protected LineageCacheEntry _nextEntry;
 	protected LineageItem _origItem;
 	
 	public LineageCacheEntry(LineageItem key, DataType dt, MatrixBlock Mval, ScalarObject Sval, long computetime) {
@@ -42,6 +43,7 @@ public class LineageCacheEntry {
 		_SOval = Sval;
 		_computeTime = computetime;
 		_status = isNullVal() ? LineageCacheStatus.EMPTY : LineageCacheStatus.CACHED;
+		_nextEntry = null;
 		_origItem = null;
 	}
 	
@@ -100,6 +102,10 @@ public class LineageCacheEntry {
 		//resume all threads waiting for val
 		notifyAll();
 	}
+	
+	public synchronized void setValue(MatrixBlock val) {
+		setValue(val, _computeTime);
+	}
 
 	public synchronized void setValue(ScalarObject val, long computetime) {
 		_SOval = val;
@@ -109,6 +115,12 @@ public class LineageCacheEntry {
 		notifyAll();
 	}
 	
+	protected synchronized void setNullValues() {
+		_MBval = null;
+		_SOval = null;
+		_status = LineageCacheStatus.EMPTY;
+	}
+	
 	protected synchronized void setTimestamp() {
 		_timestamp = System.currentTimeMillis();
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
index 127e152..b83a78b 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
@@ -26,7 +26,6 @@ import java.util.Map;
 import java.util.TreeSet;
 
 import org.apache.sysds.api.DMLScript;
-import org.apache.sysds.common.Types.DataType;
 import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.lineage.LineageCacheConfig.LineageCacheStatus;
@@ -89,22 +88,68 @@ public class LineageCacheEviction
 		}
 	}
 
-	protected static void removeEntry(Map<LineageItem, LineageCacheEntry> cache, LineageItem key) {
-		if (!cache.containsKey(key))
-			return;
-		weightedQueue.remove(cache.get(key));
-		cache.remove(key);
-	}
-
 	private static void removeEntry(Map<LineageItem, LineageCacheEntry> cache, LineageCacheEntry e) {
-		if (DMLScript.STATISTICS)
-			_removelist.add(e._key);
+		if (cache.remove(e._key) != null)
+			_cachesize -= e.getSize();
 
-		_cachesize -= e.getSize();
-		// NOTE: The caller of this method maintains the cache and the eviction queue.
-
-		if (DMLScript.STATISTICS)
+		if (DMLScript.STATISTICS) {
+			_removelist.add(e._key);
 			LineageCacheStatistics.incrementMemDeletes();
+		}
+		// NOTE: The caller of this method maintains the eviction queue.
+	}
+	private static void removeOrSpillEntry(Map<LineageItem, LineageCacheEntry> cache, LineageCacheEntry e, boolean spill) {
+		if (e._origItem == null) {
+			// Single entry. Remove or spill.
+			if (spill)
+				spillToLocalFS(cache, e);
+			else
+				removeEntry(cache, e);
+			return;
+		}
+		
+		// Defer the eviction till all the entries with the same matrix are evicted.
+		e.setCacheStatus(spill ? LineageCacheStatus.TOSPILL : LineageCacheStatus.TODELETE);
+
+		// If all the entries with the same data are evicted, check if deferred spilling 
+		// is set for any of those. If so, spill the matrix to disk and set null in the 
+		// cache entries. Keeping the spilled entries removes the need to use another 
+		// data structure and also maintains the connections between items pointing to the 
+		// same data. Delete all the entries if all are set to be deleted.
+		boolean write = false;
+		LineageCacheEntry tmp = cache.get(e._origItem); //head
+		while (tmp != null) {
+			if (tmp.getCacheStatus() != LineageCacheStatus.TOSPILL
+				&& tmp.getCacheStatus() != LineageCacheStatus.TODELETE)
+				return; //do nothing
+
+			write |= (tmp.getCacheStatus() == LineageCacheStatus.TOSPILL);
+			tmp = tmp._nextEntry;
+		}
+		if (write) {
+			// Spill to disk if at least one entry has status TOSPILL. 
+			spillToLocalFS(cache, cache.get(e._origItem));
+			LineageCacheEntry h = cache.get(e._origItem);
+			while (h != null) {
+				// Set values to null for all the entries.
+				h.setNullValues();
+				// Set status to spilled for all the entries.
+				h.setCacheStatus(LineageCacheStatus.SPILLED);
+				h = h._nextEntry;
+			}
+			// Keep them in cache.
+			return;
+		}
+		// All are set to be deleted.
+		else {
+			// Remove all the entries from cache.
+			LineageCacheEntry h = cache.get(e._origItem);
+			while (h != null) {
+				removeEntry(cache, h);
+				h = h._nextEntry;
+			}
+		}
+		// NOTE: The callers of this method maintain the eviction queue.
 	}
 
 	//---------------- CACHE SPACE MANAGEMENT METHODS -----------------//
@@ -113,6 +158,7 @@ public class LineageCacheEviction
 		CACHE_LIMIT = limit;
 	}
 
+	//Note: public for spilling tests
 	public static long getCacheLimit() {
 		return CACHE_LIMIT;
 	}
@@ -139,8 +185,7 @@ public class LineageCacheEviction
 
 			if (!LineageCacheConfig.isSetSpill()) {
 				// If eviction is disabled, just delete the entries.
-				if (cache.remove(e._key) != null)
-					removeEntry(cache, e);
+				removeOrSpillEntry(cache, e, false);
 				e = weightedQueue.pollFirst();
 				continue;
 			}
@@ -157,8 +202,7 @@ public class LineageCacheEviction
 			if (!e.isMatrixValue()) {
 				// No spilling for scalar entries. Just delete those.
 				// Note: scalar entries with higher computation time are pinned.
-				if (cache.remove(e._key) != null)
-					removeEntry(cache, e);
+				removeOrSpillEntry(cache, e, false);
 				e = weightedQueue.pollFirst();
 				continue;
 			}
@@ -180,17 +224,21 @@ public class LineageCacheEviction
 				// Can't trust the estimate if less than 100ms.
 				// Spill if it takes longer to recompute.
 				if (exectime >= LineageCacheConfig.MIN_SPILL_TIME_ESTIMATE)
-					spillToLocalFS(e);
+					//spillToLocalFS(e);
+					removeOrSpillEntry(cache, e, true);  //spill
+				else
+					removeOrSpillEntry(cache, e, false); //delete
 			}
 			else {
 				// Spill if it takes longer to recompute than spilling.
 				if (exectime > spilltime)
-					spillToLocalFS(e);
+					//spillToLocalFS(e);
+					removeOrSpillEntry(cache, e, true);  //spill
+				else
+					removeOrSpillEntry(cache, e, false); //delete
 			}
 
 			// Remove the entry from cache.
-			if (cache.remove(e._key) != null)
-				removeEntry(cache, e);
 			e = weightedQueue.pollFirst();
 		}
 	}
@@ -259,7 +307,7 @@ public class LineageCacheEviction
 
 	// ---------------- I/O METHODS TO LOCAL FS -----------------
 	
-	private static void spillToLocalFS(LineageCacheEntry entry) {
+	private static void spillToLocalFS(Map<LineageItem, LineageCacheEntry> cache, LineageCacheEntry entry) {
 		if (!entry.isMatrixValue())
 			throw new DMLRuntimeException ("Spilling scalar objects to disk is not allowd. Key: "+entry._key);
 		if (entry.isNullVal())
@@ -280,15 +328,28 @@ public class LineageCacheEviction
 		// Adjust disk writing speed
 		adjustReadWriteSpeed(entry, ((double)(t1-t0))/1000000000, false);
 		
+		// Add all the entries associated with this matrix to spillList.
+		if (entry._origItem == null) {
+			_spillList.put(entry._key, new SpilledItem(outfile));
+		}
+		else {
+			LineageCacheEntry h = cache.get(entry._origItem); //head
+			while (h != null) {
+				_spillList.put(h._key, new SpilledItem(outfile));
+				h = h._nextEntry;
+			}
+		}
+
 		if (DMLScript.STATISTICS) {
 			LineageCacheStatistics.incrementFSWriteTime(t1-t0);
 			LineageCacheStatistics.incrementFSWrites();
 		}
-
-		_spillList.put(entry._key, new SpilledItem(outfile, entry._computeTime));
 	}
 
 	protected static LineageCacheEntry readFromLocalFS(Map<LineageItem, LineageCacheEntry> cache, LineageItem key) {
+		if (cache.get(key) == null)
+			throw new DMLRuntimeException ("Spilled item should present in cache. Key: "+key);
+
 		long t0 = System.nanoTime();
 		MatrixBlock mb = null;
 		// Read from local FS
@@ -297,12 +358,23 @@ public class LineageCacheEviction
 		} catch (IOException e) {
 			throw new DMLRuntimeException ("Read from " + _spillList.get(key)._outfile + " failed.", e);
 		}
-		// Restore to cache
 		LocalFileUtils.deleteFileIfExists(_spillList.get(key)._outfile, true);
 		long t1 = System.nanoTime();
-		LineageCache.putIntern(key, DataType.MATRIX, mb, null, _spillList.get(key)._computeTime);
+
+		// Restore to cache
+		LineageCacheEntry e = cache.get(key);
+		e.setValue(mb);
+		if (e._origItem != null) {
+			// Restore to all the entries having the same data.
+			LineageCacheEntry h = cache.get(e._origItem); //head
+			while (h != null) {
+				h.setValue(mb);
+				h = h._nextEntry;
+			}
+		}
+
 		// Adjust disk reading speed
-		adjustReadWriteSpeed(cache.get(key), ((double)(t1-t0))/1000000000, true);
+		adjustReadWriteSpeed(e, ((double)(t1-t0))/1000000000, true);
 		// TODO: set cache status as RELOADED for this entry
 		_spillList.remove(key);
 		if (DMLScript.STATISTICS) {
@@ -318,13 +390,16 @@ public class LineageCacheEviction
 
 	// ---------------- INTERNAL DATA STRUCTURES FOR EVICTION -----------------
 
+	// TODO: Remove this class, and add outfile to LineageCacheEntry.
 	private static class SpilledItem {
 		String _outfile;
-		long _computeTime;
+		//long _computeTime;
+		//protected LineageItem _origItem;
 
-		public SpilledItem(String outfile, long computetime) {
+		public SpilledItem(String outfile) {
 			_outfile = outfile;
-			_computeTime = computetime;
+			//_computeTime = computetime;
+			//_origItem = origItem;
 		}
 	}
 }