You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ar...@apache.org on 2022/04/09 12:22:46 UTC

[systemds] branch main updated: [SYSTEMDS-3341] Unified Memory Manager initial implementation

This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 985e4916fa [SYSTEMDS-3341] Unified Memory Manager initial implementation
985e4916fa is described below

commit 985e4916facdda74fccfc219fec64e8b3bfcadd3
Author: arnabp <ar...@tugraz.at>
AuthorDate: Sat Apr 9 14:04:32 2022 +0200

    [SYSTEMDS-3341] Unified Memory Manager initial implementation
    
    This patch brings in the initial version of UMM.
    So far, operation memory (70%) and buffer pool memory (15%) are managed
    independently. UMM unifies these two and allows dynamic shifting of the
    boundary between these two memory partitions. While pinning an input,
    we first check if the input is available in the cache, and we make space
    by evicting cached objects if the required space is not available. We
    also reserve memory for the worst-case output memory (70% - sizeof(inputs))
    during pinning each input. A better approach is to reserve estimated
    output memory, however, that'd need us to add a reserve call in each
    instruction.
    
    Closes #1573
---
 src/main/java/org/apache/sysds/conf/DMLConfig.java |   4 +-
 .../java/org/apache/sysds/hops/OptimizerUtils.java |  70 +++-
 .../apache/sysds/parser/ParForStatementBlock.java  |   6 +-
 .../controlprogram/caching/CacheEvictionQueue.java |  64 ++++
 .../caching/CacheMaintenanceService.java           | 105 ++++++
 .../controlprogram/caching/CacheStatistics.java    |  14 +-
 .../controlprogram/caching/CacheableData.java      |  52 ++-
 .../controlprogram/caching/FrameObject.java        |   8 +-
 .../controlprogram/caching/LazyWriteBuffer.java    | 143 +-------
 .../controlprogram/caching/MatrixObject.java       |   7 +-
 .../controlprogram/caching/TensorObject.java       |   8 +-
 .../caching/UnifiedMemoryManager.java              | 392 +++++++++++++++++----
 .../sysds/runtime/meta/TensorCharacteristics.java  |  10 +
 .../test/component/frame/FrameEvictionTest.java    |  10 +-
 .../sysds/test/functions/caching/UMMTest.java      | 100 ++++++
 src/test/scripts/functions/caching/UMMTest1.dml    |  35 ++
 16 files changed, 800 insertions(+), 228 deletions(-)

diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java b/src/main/java/org/apache/sysds/conf/DMLConfig.java
index a870644838..03a0afb233 100644
--- a/src/main/java/org/apache/sysds/conf/DMLConfig.java
+++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java
@@ -165,8 +165,8 @@ public class DMLConfig
 		_defaultVals.put(NATIVE_BLAS_DIR,        "none" );
 		_defaultVals.put(LINEAGECACHESPILL,      "true" );
 		_defaultVals.put(COMPILERASSISTED_RW,    "true" );
-		_defaultVals.put(BUFFERPOOL_LIMIT,       "15"); // 15% of total heap
-		_defaultVals.put(MEMORY_MANAGER,         "static"); // static partitioning of heap
+		_defaultVals.put(BUFFERPOOL_LIMIT,       "15"); // % of total heap
+		_defaultVals.put(MEMORY_MANAGER,         "static"); // static/unified partitioning of heap
 		_defaultVals.put(PRINT_GPU_MEMORY_INFO,  "false" );
 		_defaultVals.put(EVICTION_SHADOW_BUFFERSIZE,  "0.0" );
 		_defaultVals.put(STATS_MAX_WRAP_LEN,     "30" );
diff --git a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
index f96cd0200d..a58450d5f3 100644
--- a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
@@ -45,6 +45,7 @@ import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.ForProgramBlock;
 import org.apache.sysds.runtime.controlprogram.LocalVariableMap;
 import org.apache.sysds.runtime.controlprogram.caching.LazyWriteBuffer;
+import org.apache.sysds.runtime.controlprogram.caching.UnifiedMemoryManager;
 import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysds.runtime.data.SparseBlock;
@@ -74,7 +75,21 @@ public class OptimizerUtils
 	 * NOTE: it is important that MEM_UTIL_FACTOR+CacheableData.CACHING_BUFFER_SIZE &lt; 1.0
 	 */
 	public static double MEM_UTIL_FACTOR = 0.7d;
-	
+	/** Default buffer pool sizes for static (15%) and unified (85%) memory */
+	public static double DEFAULT_MEM_UTIL_FACTOR = 0.15d;
+	public static double DEFAULT_UMM_UTIL_FACTOR = 0.85d;
+
+	/** Memory managers (static partitioned, unified) */
+	public enum MemoryManager {
+		STATIC_MEMORY_MANAGER,
+		UNIFIED_MEMORY_MANAGER
+	}
+
+	/** Indicate the current memory manager in effect */
+	public static MemoryManager MEMORY_MANAGER = null;
+	/** Buffer pool size in bytes */
+	public static long BUFFER_POOL_SIZE = 0;
+
 	/** Default blocksize if unspecified or for testing purposes */
 	public static final int DEFAULT_BLOCKSIZE = 1000;
 	
@@ -468,6 +483,59 @@ public class OptimizerUtils
 		double ret = InfrastructureAnalyzer.getLocalMaxMemory();
 		return ret * OptimizerUtils.MEM_UTIL_FACTOR;
 	}
+
+	/**
+	 * Returns buffer pool size as set in the config
+	 *
+	 * @return buffer pool size in bytes
+	 */
+	public static long getBufferPoolLimit() {
+		if (BUFFER_POOL_SIZE != 0)
+			return BUFFER_POOL_SIZE;
+		DMLConfig conf = ConfigurationManager.getDMLConfig();
+		double bufferPoolFactor = (double)(conf.getIntValue(DMLConfig.BUFFERPOOL_LIMIT))/100;
+		bufferPoolFactor = Math.max(bufferPoolFactor, DEFAULT_MEM_UTIL_FACTOR);
+		long maxMem = InfrastructureAnalyzer.getLocalMaxMemory();
+		return (long)(bufferPoolFactor * maxMem);
+	}
+
+	/**
+	 * Check if unified memory manager is in effect
+	 * @return boolean
+	 */
+	public static boolean isUMMEnabled() {
+		if (MEMORY_MANAGER == null) {
+			DMLConfig conf = ConfigurationManager.getDMLConfig();
+			boolean isUMM = conf.getTextValue(DMLConfig.MEMORY_MANAGER).equalsIgnoreCase("unified");
+			MEMORY_MANAGER = isUMM ? MemoryManager.UNIFIED_MEMORY_MANAGER : MemoryManager.STATIC_MEMORY_MANAGER;
+		}
+		return MEMORY_MANAGER == MemoryManager.UNIFIED_MEMORY_MANAGER;
+	}
+
+	/**
+	 * Disable unified memory manager and fallback to static partitioning.
+	 * Initialize LazyWriteBuffer with the default size (15%).
+	 */
+	public static void disableUMM() {
+		MEMORY_MANAGER = MemoryManager.STATIC_MEMORY_MANAGER;
+		LazyWriteBuffer.cleanup();
+		LazyWriteBuffer.init();
+		long maxMem = InfrastructureAnalyzer.getLocalMaxMemory();
+		BUFFER_POOL_SIZE = (long) (DEFAULT_MEM_UTIL_FACTOR * maxMem);
+		LazyWriteBuffer.setWriteBufferLimit(BUFFER_POOL_SIZE);
+	}
+
+	/**
+	 * Enable unified memory manager and initialize with the default size (85%).
+	 */
+	public static void enableUMM() {
+		MEMORY_MANAGER = MemoryManager.UNIFIED_MEMORY_MANAGER;
+		UnifiedMemoryManager.cleanup();
+		UnifiedMemoryManager.init();
+		long maxMem = InfrastructureAnalyzer.getLocalMaxMemory();
+		BUFFER_POOL_SIZE = (long) (DEFAULT_UMM_UTIL_FACTOR * maxMem);
+		UnifiedMemoryManager.setUMMLimit(BUFFER_POOL_SIZE);
+	}
 	
 	public static boolean isMaxLocalParallelism(int k) {
 		return InfrastructureAnalyzer.getLocalParallelism() == k;
diff --git a/src/main/java/org/apache/sysds/parser/ParForStatementBlock.java b/src/main/java/org/apache/sysds/parser/ParForStatementBlock.java
index 607641c7ff..130495d03b 100644
--- a/src/main/java/org/apache/sysds/parser/ParForStatementBlock.java
+++ b/src/main/java/org/apache/sysds/parser/ParForStatementBlock.java
@@ -363,7 +363,11 @@ public class ParForStatementBlock extends ForStatementBlock
 			_fncache.clear();
 		
 		LOG.debug("INFO: PARFOR("+_PID+"): validate successful (no dependencies) in "+time.stop()+"ms.");
-		
+
+		//disable UMM if in effect and fallback to lazy write buffer
+		if (OptimizerUtils.isUMMEnabled())
+			OptimizerUtils.disableUMM();
+
 		return vs;
 	}
 	
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheEvictionQueue.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheEvictionQueue.java
new file mode 100644
index 0000000000..0b2e3ac8b7
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheEvictionQueue.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.controlprogram.caching;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class CacheEvictionQueue extends LinkedHashMap<String, ByteBuffer>
+{
+	/**
+	 * Extended LinkedHashMap with convenience methods for adding and removing
+	 * last/first entries.
+	 *
+	 */
+	private static final long serialVersionUID = -5208333402581364859L;
+
+	public void addLast( String fname, ByteBuffer bbuff ) {
+		//put entry into eviction queue w/ 'addLast' semantics
+		put(fname, bbuff);
+	}
+
+	public Map.Entry<String, ByteBuffer> removeFirst()
+	{
+		//move iterator to first entry
+		Iterator<Map.Entry<String, ByteBuffer>> iter = entrySet().iterator();
+		Map.Entry<String, ByteBuffer> entry = iter.next();
+
+		//remove current iterator entry
+		iter.remove();
+
+		return entry;
+	}
+
+	public Map.Entry<String, ByteBuffer> removeFirstUnpinned(List<String> pinnedList) {
+		//move iterator to first entry
+		Iterator<Map.Entry<String, ByteBuffer>> iter = entrySet().iterator();
+		var entry = iter.next();
+		while (pinnedList.contains(entry.getKey()))
+			entry = iter.next();
+
+		//remove current iterator entry
+		iter.remove();
+		return entry;
+	}
+}
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheMaintenanceService.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheMaintenanceService.java
new file mode 100644
index 0000000000..21010781d8
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheMaintenanceService.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.controlprogram.caching;
+
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.util.LocalFileUtils;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class CacheMaintenanceService
+{
+	protected ExecutorService _pool = null;
+
+	public CacheMaintenanceService() {
+		//create new threadpool for async cleanup
+		if( isAsync() )
+			_pool = Executors.newCachedThreadPool();
+	}
+
+	public void deleteFile(String fname) {
+		//sync or async file delete
+		if( CacheableData.CACHING_ASYNC_FILECLEANUP )
+			_pool.submit(new CacheMaintenanceService.FileCleanerTask(fname));
+		else
+			LocalFileUtils.deleteFileIfExists(fname, true);
+	}
+
+	public void serializeData(ByteBuffer bbuff, CacheBlock cb) {
+		//sync or async file delete
+		if( CacheableData.CACHING_ASYNC_SERIALIZE )
+			_pool.submit(new CacheMaintenanceService.DataSerializerTask(bbuff, cb));
+		else {
+			try {
+				bbuff.serializeBlock(cb);
+			}
+			catch(IOException ex) {
+				throw new DMLRuntimeException(ex);
+			}
+		}
+	}
+
+	public void close() {
+		//execute pending tasks and shutdown pool
+		if( isAsync() )
+			_pool.shutdown();
+	}
+
+	@SuppressWarnings("unused")
+	public boolean isAsync() {
+		return CacheableData.CACHING_ASYNC_FILECLEANUP
+			|| CacheableData.CACHING_ASYNC_SERIALIZE;
+	}
+
+	private static class FileCleanerTask implements Runnable {
+		private String _fname = null;
+
+		public FileCleanerTask( String fname ) {
+			_fname = fname;
+		}
+
+		@Override
+		public void run() {
+			LocalFileUtils.deleteFileIfExists(_fname, true);
+		}
+	}
+
+	private static class DataSerializerTask implements Runnable {
+		private ByteBuffer _bbuff = null;
+		private CacheBlock _cb = null;
+
+		public DataSerializerTask(ByteBuffer bbuff, CacheBlock cb) {
+			_bbuff = bbuff;
+			_cb = cb;
+		}
+
+		@Override
+		public void run() {
+			try {
+				_bbuff.serializeBlock(_cb);
+			}
+			catch(IOException ex) {
+				throw new DMLRuntimeException(ex);
+			}
+		}
+	}
+}
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheStatistics.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheStatistics.java
index a001e65708..fe525be9cd 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheStatistics.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheStatistics.java
@@ -59,7 +59,7 @@ public class CacheStatistics
 	private static final LongAdder _numHitsLin      = new LongAdder();
 
 	//write statistics caching
-	private static final LongAdder _numWritesFSBuff = new LongAdder();
+	private static final LongAdder _numWritesBPool = new LongAdder();
 	private static final LongAdder _numWritesFS     = new LongAdder();
 	private static final LongAdder _numWritesHDFS   = new LongAdder();
 	private static final LongAdder _numWritesLin    = new LongAdder();
@@ -77,7 +77,7 @@ public class CacheStatistics
 		_numHitsFS.reset();
 		_numHitsHDFS.reset();
 		
-		_numWritesFSBuff.reset();
+		_numWritesBPool.reset();
 		_numWritesFS.reset();
 		_numWritesHDFS.reset();
 		_numWritesLin.reset();
@@ -148,16 +148,16 @@ public class CacheStatistics
 		return _numHitsLin.longValue();
 	}
 
-	public static void incrementFSBuffWrites() {
-		_numWritesFSBuff.increment();
+	public static void incrementBPoolWrites() {
+		_numWritesBPool.increment();
 	}
 	
 	public static void incrementFSBuffWrites(int delta) {
-		_numWritesFSBuff.add(delta);
+		_numWritesBPool.add(delta);
 	}
 	
 	public static long getFSBuffWrites() {
-		return _numWritesFSBuff.longValue();
+		return _numWritesBPool.longValue();
 	}
 	
 	public static void incrementFSWrites() {
@@ -247,7 +247,7 @@ public class CacheStatistics
 		StringBuilder sb = new StringBuilder();
 		sb.append(_numWritesLin.longValue());
 		sb.append("/");
-		sb.append(_numWritesFSBuff.longValue());
+		sb.append(_numWritesBPool.longValue());
 		sb.append("/");
 		sb.append(_numWritesFS.longValue());
 		sb.append("/");
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
index 18ca1a4f55..3cee338f7d 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
@@ -80,13 +80,10 @@ public abstract class CacheableData<T extends CacheBlock> extends Data
 
 	/** Global logging instance for all subclasses of CacheableData */
 	protected static final Log LOG = LogFactory.getLog(CacheableData.class.getName());
-	static DMLConfig conf = ConfigurationManager.getDMLConfig();
 
 	// global constant configuration parameters
-	public static final boolean UMM = conf.getTextValue(DMLConfig.MEMORY_MANAGER).equalsIgnoreCase("unified");
-	public static final long    CACHING_THRESHOLD = (long)Math.max(4*1024, //obj not s.t. caching
+	public static final long CACHING_THRESHOLD = (long)Math.max(4*1024, //obj not s.t. caching
 		1e-5 * InfrastructureAnalyzer.getLocalMaxMemory());       //if below threshold [in bytes]
-	public static final double CACHING_BUFFER_SIZE = (double)(conf.getIntValue(DMLConfig.BUFFERPOOL_LIMIT))/100; //15%
 	public static final RPolicy CACHING_BUFFER_POLICY = RPolicy.FIFO;
 	public static final boolean CACHING_BUFFER_PAGECACHE = false;
 	public static final boolean CACHING_WRITE_CACHE_ON_READ = false;
@@ -528,6 +525,10 @@ public abstract class CacheableData<T extends CacheBlock> extends Data
 		//get object from cache
 		if( _data == null )
 			getCache();
+
+		if (OptimizerUtils.isUMMEnabled())
+			//track and make space in the UMM
+			UnifiedMemoryManager.pin(this);
 		
 		//call acquireHostRead if gpuHandle is set as well as is allocated
 		if( DMLScript.USE_ACCELERATOR && _gpuObjects != null ) {
@@ -596,9 +597,10 @@ public abstract class CacheableData<T extends CacheBlock> extends Data
 		else if( _data!=null && DMLScript.STATISTICS ) {
 			CacheStatistics.incrementMemHits();
 		}
-		
+
 		//cache status maintenance
 		acquire( false, _data==null );
+
 		return _data;
 	}
 	
@@ -698,7 +700,11 @@ public abstract class CacheableData<T extends CacheBlock> extends Data
 			//compact empty in-memory block 
 			_data.compactEmptyBlock();
 		}
-		
+
+		if (OptimizerUtils.isUMMEnabled())
+			//give the memory back to UMM
+			UnifiedMemoryManager.unpin(this);
+
 		//cache status maintenance (pass cacheNoWrite flag)
 		release(_isAcquireFromEmpty && !_requiresLocalWrite);
 		
@@ -709,7 +715,11 @@ public abstract class CacheableData<T extends CacheBlock> extends Data
 			if( ( write && !hasValidLineage() ) || _requiresLocalWrite ) {
 				String filePath = getCacheFilePathAndName();
 				try {
-					LazyWriteBuffer.writeBlock(filePath, _data);
+					//write into the buffer pool
+					if (OptimizerUtils.isUMMEnabled())
+						UnifiedMemoryManager.writeBlock(filePath, _data);
+					else
+						LazyWriteBuffer.writeBlock(filePath, _data);
 				}
 				catch (Exception e) {
 					throw new DMLRuntimeException("Eviction to local path " + filePath + " ("+hashCode()+") failed.", e);
@@ -1019,8 +1029,12 @@ public abstract class CacheableData<T extends CacheBlock> extends Data
 			LOG.trace("CACHE: Freeing evicted matrix...  " + hashCode() + "  HDFS path: " + 
 				(_hdfsFileName == null ? "null" : _hdfsFileName) + " Eviction path: " + cacheFilePathAndName);
 		
-		if(isCachingActive())
-			LazyWriteBuffer.deleteBlock(cacheFilePathAndName);
+		if(isCachingActive()) {
+			if (OptimizerUtils.isUMMEnabled())
+				UnifiedMemoryManager.deleteBlock(cacheFilePathAndName);
+			else
+				LazyWriteBuffer.deleteBlock(cacheFilePathAndName);
+		}
 		
 		if( LOG.isTraceEnabled() )
 			LOG.trace("Freeing evicted matrix - COMPLETED ... " + (System.currentTimeMillis()-begin) + " msec.");
@@ -1031,7 +1045,12 @@ public abstract class CacheableData<T extends CacheBlock> extends Data
 	}
 	
 	public static boolean isBelowCachingThreshold(CacheBlock data) {
-		return LazyWriteBuffer.getCacheBlockSize(data) <= CACHING_THRESHOLD;
+		boolean ret;
+		if (OptimizerUtils.isUMMEnabled())
+			ret = UnifiedMemoryManager.getCacheBlockSize(data) <= CACHING_THRESHOLD;
+		else
+			ret = LazyWriteBuffer.getCacheBlockSize(data) <= CACHING_THRESHOLD;
+		return ret;
 	}
 	
 	public long getDataSize() {
@@ -1355,6 +1374,7 @@ public abstract class CacheableData<T extends CacheBlock> extends Data
 	public synchronized static void cleanupCacheDir() {
 		//cleanup remaining cached writes
 		LazyWriteBuffer.cleanup();
+		UnifiedMemoryManager.cleanup();
 		
 		//delete cache dir and files
 		cleanupCacheDir(true);
@@ -1420,13 +1440,17 @@ public abstract class CacheableData<T extends CacheBlock> extends Data
 			throw new IOException(e);
 		}
 	
-		//init write-ahead buffer
-		LazyWriteBuffer.init();
+		if (OptimizerUtils.isUMMEnabled())
+			//init unified memory manager
+			UnifiedMemoryManager.init();
+		else
+			//init write-ahead buffer
+			LazyWriteBuffer.init();
+
 		_refBCs.set(0);
-		
 		_activeFlag = true; //turn on caching
 	}
-	
+
 	public static boolean isCachingActive() {
 		return _activeFlag;
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
index 448538833f..84ff65192d 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
@@ -26,6 +26,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.sysds.common.Types.DataType;
 import org.apache.sysds.common.Types.FileFormat;
 import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.parser.DataExpression;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
@@ -171,7 +172,12 @@ public class FrameObject extends CacheableData<FrameBlock>
 	
 	@Override
 	protected FrameBlock readBlobFromCache(String fname) throws IOException {
-		return (FrameBlock)LazyWriteBuffer.readBlock(fname, false);
+		FrameBlock fb = null;
+		if (OptimizerUtils.isUMMEnabled())
+			fb = (FrameBlock) UnifiedMemoryManager.readBlock(fname, false);
+		else
+			fb = (FrameBlock)LazyWriteBuffer.readBlock(fname, false);
+		return fb;
 	}
 
 	@Override
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java
index 9ca079f47e..53d281631d 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java
@@ -20,15 +20,11 @@
 package org.apache.sysds.runtime.controlprogram.caching;
 
 import java.io.IOException;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.Map.Entry;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 import org.apache.sysds.api.DMLScript;
-import org.apache.sysds.runtime.DMLRuntimeException;
-import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.runtime.util.LocalFileUtils;
 
 public class LazyWriteBuffer 
@@ -39,23 +35,17 @@ public class LazyWriteBuffer
 	}
 	
 	//global size limit in bytes
-	private static final long _limit;
+	private static long _limit;
 	
 	//current size in bytes
 	private static long _size;
 	
 	//eviction queue of <filename,buffer> pairs (implemented via linked hash map
 	//for (1) queue semantics and (2) constant time get/insert/delete operations)
-	private static EvictionQueue _mQueue;
+	private static CacheEvictionQueue _mQueue;
 	
 	//maintenance service for synchronous or asynchronous delete of evicted files
-	private static MaintenanceService _fClean;
-	
-	static {
-		//obtain the logical buffer size in bytes
-		long maxMem = InfrastructureAnalyzer.getLocalMaxMemory();
-		_limit = (long)(CacheableData.CACHING_BUFFER_SIZE * maxMem);
-	}
+	private static CacheMaintenanceService _fClean;
 	
 	public static int writeBlock(String fname, CacheBlock cb)
 		throws IOException
@@ -104,7 +94,7 @@ public class LazyWriteBuffer
 			_fClean.serializeData(bbuff, cb);
 			
 			if( DMLScript.STATISTICS ) {
-				CacheStatistics.incrementFSBuffWrites();
+				CacheStatistics.incrementBPoolWrites();
 				CacheStatistics.incrementFSWrites(numEvicted);
 			}
 		}
@@ -120,7 +110,7 @@ public class LazyWriteBuffer
 		
 		return numEvicted;
 	}
-	
+
 	public static void deleteBlock(String fname)
 	{
 		boolean requiresDelete = true;
@@ -180,8 +170,9 @@ public class LazyWriteBuffer
 	}
 
 	public static void init() {
-		_mQueue = new EvictionQueue();
-		_fClean = new MaintenanceService();
+		_mQueue = new CacheEvictionQueue();
+		_fClean = new CacheMaintenanceService();
+		_limit = OptimizerUtils.getBufferPoolLimit();
 		_size = 0;
 		if( CacheableData.CACHING_BUFFER_PAGECACHE )
 			PageCache.init();
@@ -201,6 +192,10 @@ public class LazyWriteBuffer
 		//dynamically adjusted in a parfor context, which wouldn't reflect the actual size
 		return _limit;
 	}
+
+	public static void setWriteBufferLimit(long limit) {
+		_limit = limit;
+	}
 	
 	public static long getWriteBufferSize() {
 		synchronized( _mQueue ) {
@@ -281,116 +276,4 @@ public class LazyWriteBuffer
 	public static ExecutorService getUtilThreadPool() {
 		return _fClean != null ? _fClean._pool : null;
 	}
-	
-	/**
-	 * Extended LinkedHashMap with convenience methods for adding and removing
-	 * last/first entries.
-	 * 
-	 */
-	private static class EvictionQueue extends LinkedHashMap<String, ByteBuffer>
-	{
-		private static final long serialVersionUID = -5208333402581364859L;
-		
-		public void addLast( String fname, ByteBuffer bbuff ) {
-			//put entry into eviction queue w/ 'addLast' semantics
-			put(fname, bbuff);
-		}
-		
-		public Entry<String, ByteBuffer> removeFirst()
-		{
-			//move iterator to first entry
-			Iterator<Entry<String, ByteBuffer>> iter = entrySet().iterator();
-			Entry<String, ByteBuffer> entry = iter.next();
-			
-			//remove current iterator entry
-			iter.remove();
-			
-			return entry;
-		}
-	}
-	
-	/**
-	 * Maintenance service for abstraction of synchronous and asynchronous
-	 * file cleanup on rmvar/cpvar as well as serialization of matrices and
-	 * frames. The thread pool for asynchronous cleanup may increase the 
-	 * number of threads temporarily to the number of concurrent delete tasks
-	 * (which is bounded to the parfor degree of parallelism).
-	 */
-	private static class MaintenanceService
-	{
-		private ExecutorService _pool = null;
-		
-		public MaintenanceService() {
-			//create new threadpool for async cleanup
-			if( isAsync() )
-				_pool = Executors.newCachedThreadPool();
-		}
-		
-		public void deleteFile(String fname) {
-			//sync or async file delete
-			if( CacheableData.CACHING_ASYNC_FILECLEANUP )
-				_pool.submit(new FileCleanerTask(fname));
-			else
-				LocalFileUtils.deleteFileIfExists(fname, true);
-		}
-		
-		public void serializeData(ByteBuffer bbuff, CacheBlock cb) {
-			//sync or async file delete
-			if( CacheableData.CACHING_ASYNC_SERIALIZE )
-				_pool.submit(new DataSerializerTask(bbuff, cb));
-			else {
-				try {
-					bbuff.serializeBlock(cb);
-				}
-				catch(IOException ex) {
-					throw new DMLRuntimeException(ex);
-				}
-			}
-		}
-		
-		public void close() {
-			//execute pending tasks and shutdown pool
-			if( isAsync() )
-				_pool.shutdown();
-		}
-		
-		@SuppressWarnings("unused")
-		public boolean isAsync() {
-			return CacheableData.CACHING_ASYNC_FILECLEANUP 
-				|| CacheableData.CACHING_ASYNC_SERIALIZE;
-		}
-		
-		private static class FileCleanerTask implements Runnable {
-			private String _fname = null;
-			
-			public FileCleanerTask( String fname ) {
-				_fname = fname;
-			}
-			
-			@Override
-			public void run() {
-				LocalFileUtils.deleteFileIfExists(_fname, true);
-			}
-		}
-		
-		private static class DataSerializerTask implements Runnable {
-			private ByteBuffer _bbuff = null;
-			private CacheBlock _cb = null;
-			
-			public DataSerializerTask(ByteBuffer bbuff, CacheBlock cb) {
-				_bbuff = bbuff;
-				_cb = cb;
-			}
-			
-			@Override
-			public void run() {
-				try {
-					_bbuff.serializeBlock(_cb);
-				}
-				catch(IOException ex) {
-					throw new DMLRuntimeException(ex);
-				}
-			}
-		}
-	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
index 6afefcfe23..ae5ca10eca 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
@@ -412,7 +412,12 @@ public class MatrixObject extends CacheableData<MatrixBlock> {
 
 	@Override
 	protected MatrixBlock readBlobFromCache(String fname) throws IOException {
-		return (MatrixBlock) LazyWriteBuffer.readBlock(fname, true);
+		MatrixBlock mb = null;
+		if (OptimizerUtils.isUMMEnabled())
+			mb = (MatrixBlock) UnifiedMemoryManager.readBlock(fname, true);
+		else
+			mb = (MatrixBlock) LazyWriteBuffer.readBlock(fname, true);
+		return mb;
 	}
 
 	@Override
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/TensorObject.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/TensorObject.java
index 6b29d41e60..248f3fc226 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/TensorObject.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/TensorObject.java
@@ -26,6 +26,7 @@ import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types.DataType;
 import org.apache.sysds.common.Types.FileFormat;
 import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
@@ -104,7 +105,12 @@ public class TensorObject extends CacheableData<TensorBlock> {
 
 	@Override
 	protected TensorBlock readBlobFromCache(String fname) throws IOException {
-		return (TensorBlock) LazyWriteBuffer.readBlock(fname, false);
+		TensorBlock tb = null;
+		if (OptimizerUtils.isUMMEnabled())
+			tb = (TensorBlock) UnifiedMemoryManager.readBlock(fname, false);
+		else
+			tb = (TensorBlock) LazyWriteBuffer.readBlock(fname, false);
+		return tb;
 	}
 
 	@Override
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/UnifiedMemoryManager.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/UnifiedMemoryManager.java
index 48420b872b..e4b7761307 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/UnifiedMemoryManager.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/UnifiedMemoryManager.java
@@ -20,6 +20,15 @@
 package org.apache.sysds.runtime.controlprogram.caching;
 
 import org.apache.commons.lang.NotImplementedException;
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.hops.OptimizerUtils;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.util.LocalFileUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
  * Unified Memory Manager - Initial Design
@@ -44,7 +53,7 @@ import org.apache.commons.lang.NotImplementedException;
  * they can occupy, meaning that the boundary for the areas can shift dynamically depending
  * on the current load. Most importantly, though, dirty objects must not be counted twice
  * when pinning such an object for an operation. The min/max constraints are not exposed but
- * configured internally. An good starting point are the following constraints (relative to
+ * configured internally. A good starting point are the following constraints (relative to
  * JVM max heap size):
  * ___________________________
  * | operations  | 0%  | 70% | (pin requests always accepted)
@@ -63,94 +72,341 @@ import org.apache.commons.lang.NotImplementedException;
  *      eviction. All pin requests have to be accepted, and once a non-dirty object is released
  *      (unpinned) it can be dropped without persisting it to local FS.
  *
+ * Example Scenarios for an Operation:
+ *  (1) Inputs are available in the UMM, enough space left for the output.
+ *  (2) Some inputs are pre-evicted. Read and pin those in the operational memory.
+ *  (3) Inputs are available in the UMM, not enough space left for the output.
+ *  	Evict cached objects to reserve worst-case output memory.
+ *  (4) Some inputs are pre-evicted and not enough space left for the inputs
+ *  	and output. Evict cached objects to make space for the inputs.
+ *  	Evict cached objects to reserve worst-case output memory.
+ *
  * Thread-safeness:
  * Initially, the UMM will be used in an instance-based manner. For global visibility and
  * use in parallel for loops, the UMM would need to provide a static, synchronized API, but
  * this constitutes a source of severe contention. In the future, we will consider a design
  * with thread-local UMMs for the individual parfor workers.
- *
- * Testing:
- * The UMM will be developed bottom up, and thus initially tested via component tests for
- * evaluating the eviction behavior for sequences of API requests. 
  */
+
 public class UnifiedMemoryManager
 {
-	public UnifiedMemoryManager(long capacity) {
-		//TODO implement
-		throw new NotImplementedException();
+	// Maximum size of UMM in bytes (default 85%)
+	private static long _limit;
+	// Current total size of the cached objects
+	private static long _totCachedSize;
+	// Operational memory limit in bytes (70%)
+	private static long _opMemLimit;
+	// List of pinned entries
+	private static final List<String> _pinnedEntries = new ArrayList<String>();
+
+	// Eviction queue of <filename,buffer> pairs (implemented via linked hash map
+	// for (1) queue semantics and (2) constant time get/insert/delete operations)
+	private static CacheEvictionQueue _mQueue;
+
+	// Maintenance service for synchronous or asynchronous delete of evicted files
+	private static CacheMaintenanceService _fClean;
+
+	// Pinned size of physical memory. Starts from 0 for each operation. Max is 70% of heap
+	// This increases only if the input is not present in the cache and read from FS/rdd/fed/gpu
+	private static long _pinnedPhysicalMemSize = 0;
+	// Size of pinned virtual memory. This tracks the total input size
+	// This increases if the input is available in the cache.
+	private static long _pinnedVirtualMemSize = 0;
+
+	//---------------- OPERATION MEMORY MAINTENANCE -------------------//
+
+	// Make space for and track a cache block to be pinned in operation memory
+	public static void pin(CacheableData<?> cd) {
+		if (!CacheableData.isCachingActive()) {
+			return;
+		}
+
+		// Space accounting based on an estimated size and before reading the blob
+		long estimatedSize = OptimizerUtils.estimateSize(cd.getDataCharacteristics());
+		if (probe(cd))
+			// Availability in the cache means no memory overhead.
+			// We still need to track to derive the worst-case output memory
+			_pinnedVirtualMemSize += estimatedSize;
+		else {
+			// The blob will be restored from local FS, or will be read
+			// from other backends. Make space if not available.
+			makeSpace(estimatedSize);
+			_pinnedPhysicalMemSize += estimatedSize;
+		}
+		// Track the pinned entries to protect from evictions
+		_pinnedEntries.add(cd.getCacheFilePathAndName());
+
+		// Reserve space for output after pinning every input.
+		// This overly conservative approach removes the need to call reserveOutputMem() from
+		// each instruction. Ideally, every instruction first pins all the inputs, followed
+		// by reserving space for the output.
+		reserveOutputMem();
 	}
-	
-	/**
-	 * Pins a cache block into operation memory.
-	 * 
-	 * @param key    unique identifier and local FS filename for eviction
-	 * @param block  cache block if not under UMM control, null otherwise
-	 * @param dirty  indicator if block is dirty (subject to buffer pool management)
-	 * @return       pinned cache block, potentially restored from local FS
-	 */
-	public CacheBlock pin(String key, CacheBlock block, boolean dirty) {
-		//TODO implement
-		throw new NotImplementedException();
+
+	// Reserve space for output in the operation memory
+	public static void reserveOutputMem() {
+		if (!OptimizerUtils.isUMMEnabled() || !CacheableData.isCachingActive())
+			return;
+
+		// Worst case upper bound for output = 70% - size(inputs)
+		// FIXME: Parfor splits this 70% into smaller limits
+		long maxOutputSize = _opMemLimit - (_pinnedVirtualMemSize + _pinnedPhysicalMemSize);
+		// Evict cached entries to make space in operation memory if needed
+		makeSpace(maxOutputSize);
 	}
 	
-	/**
-	 * Pins a virtual cache block into operation memory, by making a size reservation.
-	 * The provided size is an upper bound of the actual object size, and can be
-	 * updated on unpin (once the actual cache block is provided).
-	 * 
-	 * @param key    unique identifier and local FS filename for eviction
-	 * @param size   memory reservation in operation area
-	 * @param dirty  indicator if block is dirty (subject to buffer pool management)
-	 */
-	public void pin(String key, long size, boolean dirty) {
-		//TODO implement
-		throw new NotImplementedException();
+	// Unpins (releases) a cache block from operation memory
+	public static void unpin(CacheableData<?> cd) {
+		if (!CacheableData.isCachingActive())
+			return;
+
+		// TODO: Track preserved output memory to protect from concurrent threads
+		if (!_pinnedEntries.contains(cd.getCacheFilePathAndName()))
+			return; //unpinned. output of an instruction
+
+		// We still use the estimated size even though we have the blobs available.
+		// This makes sure we are subtracting exactly what we added during pinning.
+		long estimatedSize = OptimizerUtils.estimateSize(cd.getDataCharacteristics());
+		if (probe(cd))
+			_pinnedVirtualMemSize -= estimatedSize;
+		else
+			_pinnedPhysicalMemSize -= estimatedSize;
+
+		_pinnedEntries.remove(cd.getCacheFilePathAndName());
 	}
-	
-	/**
-	 * Unpins (releases) a cache block from operation memory. Dirty objects
-	 * are logically moved back to the buffer pool area.
-	 * 
-	 * @param key    unique identifier and local FS filename for eviction
-	 */
-	public void unpin(String key) {
-		//TODO implement
-		throw new NotImplementedException();
+
+	//---------------- UMM MAINTENANCE & LOOKUP -------------------//
+
+	// Initialize the unified memory manager
+	public static void init() {
+		_mQueue = new CacheEvictionQueue();
+		_fClean = new CacheMaintenanceService();
+		_limit = OptimizerUtils.getBufferPoolLimit();
+		_opMemLimit = (long)(OptimizerUtils.getLocalMemBudget()); //70% of heap
+		_totCachedSize = 0;
+		_pinnedPhysicalMemSize = 0;
+		_pinnedVirtualMemSize = 0;
+		if( CacheableData.CACHING_BUFFER_PAGECACHE )
+			PageCache.init();
 	}
-	
-	/**
-	 * Unpins (releases) a cache block from operation memory. If the size of
-	 * the provided cache block differs from the UMM meta data, the UMM meta
-	 * data is updated. Use cases include update-in-place operations and
-	 * size reservations via worst-case upper bound estimates.
-	 * 
-	 * @param key    unique identifier and local FS filename for eviction
-	 * @param block  cache block which may be under UMM control, if null ignored
-	 */
-	public void unpin(String key, CacheBlock block) {
-		//TODO implement
-		throw new NotImplementedException();
+
+	// Cleanup the unified memory manager
+	public static void cleanup() {
+		if( _mQueue != null )
+			_mQueue.clear();
+		if( _fClean != null )
+			_fClean.close();
+		if( CacheableData.CACHING_BUFFER_PAGECACHE )
+			PageCache.clear();
+		_totCachedSize = 0;
+		_pinnedPhysicalMemSize = 0;
+		_pinnedVirtualMemSize = 0;
 	}
-	
-	/**
-	 * Removes a cache block associated with the given key from all memory
-	 * areas, and deletes evicted representations (files in local FS). The
-	 * local file system deletes can happen asynchronously.
-	 * 
-	 * @param key    unique identifier and local FS filename for eviction
-	 */
-	public void delete(String key) {
-		//TODO implement
-		throw new NotImplementedException();
+
+	public static void setUMMLimit(long val) {
+		_limit = val;
 	}
-	
+
+	public static long getUMMSize() {
+		synchronized(_mQueue) {
+			return _limit;
+		}
+	}
+
+	// Get the available memory in UMM
+	public static long getUMMFree() {
+		synchronized(_mQueue) {
+			return _limit - (_totCachedSize + _pinnedPhysicalMemSize);
+		}
+	}
+
+	// Reads a cached object. This is called from cacheabledata implementations
+	public static CacheBlock readBlock(String fname, boolean matrix)
+		throws IOException
+	{
+		CacheBlock cb = null;
+		ByteBuffer ldata = null;
+
+		//probe write buffer
+		synchronized (_mQueue)
+		{
+			ldata = _mQueue.get(fname);
+
+			//modify eviction order (accordingly to access)
+			if (CacheableData.CACHING_BUFFER_POLICY == LazyWriteBuffer.RPolicy.LRU
+				&& ldata != null)
+			{
+				//reinsert entry at end of eviction queue
+				_mQueue.remove (fname);
+				_mQueue.addLast (fname, ldata);
+			}
+		}
+
+		//deserialize or read from FS if required
+		if( ldata != null )
+		{
+			cb = ldata.deserializeBlock();
+			if (DMLScript.STATISTICS)
+				CacheStatistics.incrementFSBuffHits();
+		}
+		else
+		{
+			cb = LocalFileUtils.readCacheBlockFromLocal(fname, matrix);
+			if (DMLScript.STATISTICS)
+				CacheStatistics.incrementFSHits();
+		}
+
+		return cb;
+	}
+
+	public static boolean probe(CacheableData<?> cd) {
+		String filePath = cd.getCacheFilePathAndName();
+		return _mQueue.containsKey(filePath);
+	}
+
+	// Make required space. Evict if needed.
+	public static int makeSpace(long reqSpace) {
+		int numEvicted = 0;
+		// Check if sufficient space is already available
+		if (getUMMFree() > reqSpace)
+			return numEvicted;
+
+		// Evict cached objects to make space
+		try {
+			synchronized(_mQueue) {
+				// Evict blobs to make room (by default FIFO)
+				while (getUMMFree() < reqSpace && !_mQueue.isEmpty()) {
+					//remove first unpinned entry from eviction queue
+					var entry = _mQueue.removeFirstUnpinned(_pinnedEntries);
+					String ftmp = entry.getKey();
+					ByteBuffer bb = entry.getValue();
+
+					if(bb != null) {
+						// Wait for pending serialization
+						bb.checkSerialized();
+						// Evict object
+						bb.evictBuffer(ftmp);
+						bb.freeMemory();
+						_totCachedSize -= bb.getSize();
+						numEvicted++;
+					}
+				}
+			}
+		}
+		catch (Exception e) {
+			throw new DMLRuntimeException("Eviction request of size "+(reqSpace-getUMMFree())+ " in the UMM failed.", e);
+		}
+
+		if( DMLScript.STATISTICS )
+			CacheStatistics.incrementFSWrites(numEvicted);
+
+		return numEvicted;
+	}
+
+	// Write an object to the cache
+	public static int writeBlock(String fname, CacheBlock cb)
+		throws IOException
+	{
+		//obtain basic metadata of the cache block
+		long lSize = getCacheBlockSize(cb);
+		boolean requiresWrite = (lSize > _limit        //global buffer limit
+			|| !ByteBuffer.isValidCapacity(lSize, cb)); //local buffer limit
+		int numEvicted = 0;
+
+		// Handle caching/eviction if it fits in UMM
+		if( !requiresWrite )
+		{
+			// Create byte buffer handle (no block allocation yet)
+			ByteBuffer bbuff = new ByteBuffer( lSize );
+
+			// Modify buffer pool
+			synchronized( _mQueue )
+			{
+				// Evict blocks to make room if required
+				numEvicted += makeSpace(lSize);
+				// Put placeholder into buffer pool (reserve mem)
+				_mQueue.addLast(fname, bbuff);
+				_totCachedSize += lSize;
+			}
+
+			// Serialize matrix (outside synchronized critical path)
+			_fClean.serializeData(bbuff, cb);
+
+			if( DMLScript.STATISTICS )
+				CacheStatistics.incrementBPoolWrites();
+		}
+		else
+		{
+			// Write directly to local FS (bypass buffer if too large)
+			LocalFileUtils.writeCacheBlockToLocal(fname, cb);
+			if( DMLScript.STATISTICS ) {
+				CacheStatistics.incrementFSWrites();
+			}
+			numEvicted++;
+		}
+
+		return numEvicted;
+	}
+
+	public static long getCacheBlockSize(CacheBlock cb) {
+		return cb.isShallowSerialize() ?
+			cb.getInMemorySize() : cb.getExactSerializedSize();
+	}
+
+	public static void deleteBlock(String fname)
+	{
+		boolean requiresDelete = true;
+
+		synchronized( _mQueue )
+		{
+			//remove queue entry
+			ByteBuffer ldata = _mQueue.remove(fname);
+			if( ldata != null ) {
+				_totCachedSize -= ldata.getSize();
+				requiresDelete = false;
+				ldata.freeMemory(); //cleanup
+			}
+		}
+
+		//delete from FS if required
+		if( requiresDelete )
+			_fClean.deleteFile(fname);
+	}
+
 	/**
 	 * Removes all cache blocks from all memory areas and deletes all evicted
 	 * representations (files in local FS). All internally thread pools must be
-	 * shut down in a gracefully manner (e.g., wait for pending deletes).
+	 * shut down in a graceful manner (e.g., wait for pending deletes).
 	 */
 	public void deleteAll() {
 		//TODO implement
 		throw new NotImplementedException();
 	}
+
+	/**
+	 * Evicts all buffer pool entries.
+	 * NOTE: use only for debugging or testing.
+	 *
+	 * @throws IOException if IOException occurs
+	 */
+	public static void forceEviction()
+		throws IOException
+	{
+		//evict all matrices and frames
+		while( !_mQueue.isEmpty() )
+		{
+			//remove first entry from eviction queue
+			Map.Entry<String, ByteBuffer> entry = _mQueue.removeFirst();
+			ByteBuffer tmp = entry.getValue();
+
+			if( tmp != null ) {
+				//wait for pending serialization
+				tmp.checkSerialized();
+
+				//evict matrix
+				tmp.evictBuffer(entry.getKey());
+				tmp.freeMemory();
+			}
+		}
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/meta/TensorCharacteristics.java b/src/main/java/org/apache/sysds/runtime/meta/TensorCharacteristics.java
index 2b554a261f..20e11d955f 100644
--- a/src/main/java/org/apache/sysds/runtime/meta/TensorCharacteristics.java
+++ b/src/main/java/org/apache/sysds/runtime/meta/TensorCharacteristics.java
@@ -151,6 +151,16 @@ public class TensorCharacteristics extends DataCharacteristics
 		return _nnz;
 	}
 
+	@Override
+	public long getRows() {
+		return getDim(0);
+	}
+
+	@Override
+	public long getCols() {
+		return getDim(1);
+	}
+
 	@Override
 	public String toString() {
 		return "["+Arrays.toString(_dims)+", nnz="+_nnz + ", blocksize= "+_blocksize+"]";
diff --git a/src/test/java/org/apache/sysds/test/component/frame/FrameEvictionTest.java b/src/test/java/org/apache/sysds/test/component/frame/FrameEvictionTest.java
index d8c3d452e8..6c966e0f98 100644
--- a/src/test/java/org/apache/sysds/test/component/frame/FrameEvictionTest.java
+++ b/src/test/java/org/apache/sysds/test/component/frame/FrameEvictionTest.java
@@ -21,6 +21,8 @@ package org.apache.sysds.test.component.frame;
 
 import java.lang.reflect.Method;
 
+import org.apache.sysds.hops.OptimizerUtils;
+import org.apache.sysds.runtime.controlprogram.caching.UnifiedMemoryManager;
 import org.junit.Test;
 import org.apache.sysds.common.Types.FileFormat;
 import org.apache.sysds.common.Types.ValueType;
@@ -208,8 +210,12 @@ public class FrameEvictionTest extends AutomatedTestBase
 			fo.release();
 			
 			//evict frame and clear in-memory reference
-			if( force )
-				LazyWriteBuffer.forceEviction();
+			if( force ) {
+				if (OptimizerUtils.isUMMEnabled())
+					UnifiedMemoryManager.forceEviction();
+				else
+					LazyWriteBuffer.forceEviction();
+			}
 			Method clearfo = CacheableData.class
 					.getDeclaredMethod("clearCache", new Class[]{});
 			clearfo.setAccessible(true); //make method public
diff --git a/src/test/java/org/apache/sysds/test/functions/caching/UMMTest.java b/src/test/java/org/apache/sysds/test/functions/caching/UMMTest.java
new file mode 100644
index 0000000000..0bb61a83dd
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/caching/UMMTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.sysds.test.functions.caching;
+
+	import java.util.ArrayList;
+	import java.util.HashMap;
+	import java.util.List;
+
+	import org.apache.sysds.hops.OptimizerUtils;
+	import org.apache.sysds.hops.recompile.Recompiler;
+	import org.apache.sysds.runtime.controlprogram.caching.CacheStatistics;
+	import org.apache.sysds.runtime.controlprogram.caching.UnifiedMemoryManager;
+	import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+	import org.apache.sysds.runtime.matrix.data.MatrixValue;
+	import org.apache.sysds.test.AutomatedTestBase;
+	import org.apache.sysds.test.TestConfiguration;
+	import org.apache.sysds.test.TestUtils;
+	import org.junit.Assert;
+	import org.junit.Test;
+
+public class UMMTest extends AutomatedTestBase {
+
+	protected static final String TEST_DIR = "functions/caching/";
+	protected static final String TEST_NAME1 = "UMMTest1";
+
+	protected String TEST_CLASS_DIR = TEST_DIR + org.apache.sysds.test.functions.caching.UMMTest.class.getSimpleName() + "/";
+
+	@Override
+	public void setUp() {
+		TestUtils.clearAssertionInformation();
+		addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1));
+	}
+
+	@Test
+	public void testEvictionOrder() {
+		runTest(TEST_NAME1);
+	}
+
+	public void runTest(String testname)
+	{
+		try {
+			getAndLoadTestConfiguration(testname);
+			fullDMLScriptName = getScript();
+			long oldBufferPool = (long)(0.15 * InfrastructureAnalyzer.getLocalMaxMemory());
+
+			// Static memory management
+			List<String> proArgs = new ArrayList<>();
+			proArgs.add("-stats");
+			proArgs.add("-args");
+			proArgs.add(String.valueOf(oldBufferPool));
+			proArgs.add(output("R"));
+			programArgs = proArgs.toArray(new String[proArgs.size()]);
+			runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
+			//HashMap<MatrixValue.CellIndex, Double> R_static = readDMLMatrixFromOutputDir("R");
+			HashMap<MatrixValue.CellIndex, Double> R_static = readDMLScalarFromOutputDir("R");
+			long FSwrites_static = CacheStatistics.getFSWrites();
+
+			// Unified memory management (cache size = 85% of heap)
+			OptimizerUtils.enableUMM();
+			proArgs.clear();
+			proArgs.add("-stats");
+			proArgs.add("-args");
+			proArgs.add(String.valueOf(oldBufferPool));
+			proArgs.add(output("R"));
+			programArgs = proArgs.toArray(new String[proArgs.size()]);
+			runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
+			UnifiedMemoryManager.cleanup();
+			HashMap<MatrixValue.CellIndex, Double> R_unified= readDMLScalarFromOutputDir("R");
+			long FSwrites_unified = CacheStatistics.getFSWrites();
+
+			// Compare results
+			TestUtils.compareMatrices(R_static, R_unified, 1e-6, "static", "unified");
+			// Compare FS write counts (#unified FS writes always smaller than #static FS writes)
+			Assert.assertTrue("Violated buffer pool eviction counts: "+FSwrites_unified+" <= "+FSwrites_static,
+				FSwrites_unified <= FSwrites_static);
+		}
+		finally {
+			Recompiler.reinitRecompiler();
+		}
+	}
+}
+
diff --git a/src/test/scripts/functions/caching/UMMTest1.dml b/src/test/scripts/functions/caching/UMMTest1.dml
new file mode 100644
index 0000000000..914f598930
--- /dev/null
+++ b/src/test/scripts/functions/caching/UMMTest1.dml
@@ -0,0 +1,35 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+ummsize = $1;
+nrows = ceil(sqrt(ummsize*0.40/8));
+nrows = 4858;
+X = rand(rows=nrows, cols=nrows, seed=42);
+Y = rand(rows=nrows, cols=nrows, seed=43);
+
+R1 = X + Y;
+R2 = X * Y;
+
+R3 = R2 - R1;
+while(FALSE) {}
+R = sum(R3);
+write(R, $2, format="text");
+