You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by ni...@apache.org on 2018/09/14 19:21:10 UTC

systemml git commit: [MINOR] Add two helper utilities.

Repository: systemml
Updated Branches:
  refs/heads/master e2dc85688 -> 4d8df33cc


[MINOR] Add two helper utilities.

- First, PersistentLRUCache to cache double[], float[] and MatrixBlock without requiring the user to worry about OOM.
- Second, reblockAndWrite method in MLContextUtil class to reblock the output of a DML script as rectangular blocked RDDs.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/4d8df33c
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/4d8df33c
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/4d8df33c

Branch: refs/heads/master
Commit: 4d8df33cc53583e71c4a488577270461e6f712e2
Parents: e2dc856
Author: Niketan Pansare <np...@us.ibm.com>
Authored: Fri Sep 14 12:17:11 2018 -0700
Committer: Niketan Pansare <np...@us.ibm.com>
Committed: Fri Sep 14 12:17:11 2018 -0700

----------------------------------------------------------------------
 .../sysml/api/mlcontext/MLContextUtil.java      |  31 ++
 .../apache/sysml/utils/PersistentLRUCache.java  | 518 +++++++++++++++++++
 2 files changed, 549 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/4d8df33c/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java b/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
index dcf4595..c0ce6c9 100644
--- a/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
+++ b/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
@@ -21,6 +21,7 @@ package org.apache.sysml.api.mlcontext;
 
 import java.io.File;
 import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Date;
@@ -74,6 +75,7 @@ import org.apache.sysml.runtime.instructions.cp.DoubleObject;
 import org.apache.sysml.runtime.instructions.cp.IntObject;
 import org.apache.sysml.runtime.instructions.cp.StringObject;
 import org.apache.sysml.runtime.instructions.cp.VariableCPInstruction;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
@@ -118,6 +120,35 @@ public final class MLContextUtil {
 	@SuppressWarnings("rawtypes")
 	public static final Class[] ALL_SUPPORTED_DATA_TYPES = (Class[]) ArrayUtils.addAll(BASIC_DATA_TYPES,
 			COMPLEX_DATA_TYPES);
+	
+	/**
+	 * Utility method to write an output as rectangular blocked RDD
+	 *  
+	 * @param spark spark session
+	 * @param dmlScript script that generates the outVariable
+	 * @param outVariable variable name
+	 * @param outFilePath output file path
+	 * @param rowsPerBlock number of rows per block
+	 * @param colsPerBlock number of columns per block 
+	 * @throws IOException if error occurs
+	 */
+	public static void reblockAndWrite(SparkSession spark, String dmlScript, String outVariable, String outFilePath, int rowsPerBlock, int colsPerBlock) throws IOException {
+		MLContext ml = new MLContext(spark);
+		Script helloScript = org.apache.sysml.api.mlcontext.ScriptFactory.dml(dmlScript).out(outVariable);
+		MLResults res = ml.execute(helloScript);
+		JavaPairRDD<MatrixIndexes, MatrixBlock> rdd = res.getMatrix(outVariable).toBinaryBlocks();
+		MatrixCharacteristics mc = res.getMatrix(outVariable).getMatrixMetadata().asMatrixCharacteristics();
+		MatrixCharacteristics mcOut = new MatrixCharacteristics(mc);
+		mcOut.setRowsPerBlock(rowsPerBlock);
+		mcOut.setColsPerBlock(colsPerBlock);
+		JavaPairRDD<MatrixIndexes, MatrixBlock> out = org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils.mergeByKey(rdd.flatMapToPair(
+				new org.apache.sysml.runtime.instructions.spark.functions.ExtractBlockForBinaryReblock(mc, mcOut)), false);
+		out.saveAsHadoopFile(outFilePath, MatrixIndexes.class, MatrixBlock.class, org.apache.hadoop.mapred.SequenceFileOutputFormat.class);
+		org.apache.sysml.runtime.util.MapReduceTool.writeMetaDataFile(outFilePath + ".mtd", 
+				org.apache.sysml.parser.Expression.ValueType.DOUBLE, mcOut, 
+				org.apache.sysml.runtime.matrix.data.OutputInfo.BinaryBlockOutputInfo, 
+				new org.apache.sysml.runtime.io.FileFormatProperties());
+	}
 
 	/**
 	 * Compare two version strings (ie, "1.4.0" and "1.4.1").

http://git-wip-us.apache.org/repos/asf/systemml/blob/4d8df33c/src/main/java/org/apache/sysml/utils/PersistentLRUCache.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/utils/PersistentLRUCache.java b/src/main/java/org/apache/sysml/utils/PersistentLRUCache.java
new file mode 100644
index 0000000..83a0dcf
--- /dev/null
+++ b/src/main/java/org/apache/sysml/utils/PersistentLRUCache.java
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sysml.utils;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.ref.SoftReference;
+import java.nio.file.Files;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.log4j.Level;
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.util.DataConverter;
+import org.apache.sysml.runtime.util.FastBufferedDataInputStream;
+import org.apache.sysml.runtime.util.FastBufferedDataOutputStream;
+
+/**
+ * Simple utility to store double[], float[] and MatrixBlock in-memory.
+ * It is designed to guard against OOM by using soft reference as well as max capacity. 
+ * When memory is full or if capacity is exceeded, SimplePersistingCache stores the least recently used values into the local filesystem.
+ * Assumption: GC occurs before an OutOfMemoryException, and GC requires prior finalize call.
+ * 
+ * The user should use custom put and get methods:
+ * - put(String key, double[] value);
+ * - put(String key, float[] value);
+ * - put(String key, MatrixBlock value);
+ * - double [] getAsDoubleArray(String key);
+ * - float [] getAsFloatArray(String key);
+ * - MatrixBlock getAsMatrixBlock(String key);
+ * 
+ * Additionally, the user can also use standard Map methods:
+ * - void clear();
+ * - boolean containsKey(String key)
+ * - remove(String key);
+ * 
+ * Instead of using generic types i.e. LinkedHashMap<String, ?>,  we are allowing the cache to store values of different types.
+ * ValueWrapper is a container in this case to store the actual values (i.e. double[]. float[] or MatrixBlock).
+ * 
+ * The cache can be used in two modes:
+ * - Read-only mode (only applicable for MatrixBlock keys): 
+ *   = We delete the value when capacity is exceeded or when GC occurs. 
+ *   = When get is invoked on the deleted key, the key is treated as the full path and MatrixBlock is read from that path.
+ *   = Note: in the current version, the metadata file is ignored and the file-format is assumed to be binary-block. We can extend this later.
+ * - General case: 
+ *   = We persist the values to the file system (into temporary directory) when capacity is exceeded or when GC occurs. 
+ *   = When get is invoked on the deleted key, the key is treated as the file name (not the absolute path) and MatrixBlock is read from that path.
+ * 
+ * This class does not assume minimum capacity and hence only soft references.
+ * 
+ * To test this class, please use the below command:
+ * java -cp systemml-*-standalone.jar:commons-lang3-3.8.jar org.apache.sysml.utils.PersistentLRUCache.
+ */
+public class PersistentLRUCache extends LinkedHashMap<String, ValueWrapper> {
+	static final Log LOG = LogFactory.getLog(PersistentLRUCache.class.getName());
+	private static final long serialVersionUID = -6838798881747433047L;
+	private String _prefixFilePath;
+	final AtomicLong _currentNumBytes = new AtomicLong();
+	private final long _maxNumBytes;
+	Random _rand = new Random();
+	boolean isInReadOnlyMode;
+	
+	public static void main(String [] args) throws IOException {
+		org.apache.log4j.Logger.getRootLogger().setLevel(Level.DEBUG);
+		double numBytesInMB = 1e+7;
+		int numDoubleInMB = (int) (numBytesInMB / 8);
+		PersistentLRUCache cache = new PersistentLRUCache((long)(numBytesInMB*25));
+		for(int i = 0; i < 30; ++i) {
+			LOG.debug("Putting a double array of size 1MB.");
+			cache.put("file_" + i, new double[numDoubleInMB]);
+		}
+		cache.clear();
+	}
+	
+	/**
+	 * When enabled, the cache will discard the values instead of writing it to the local file system.
+	 * 
+	 * @return this
+	 */
+	public PersistentLRUCache enableReadOnlyMode(boolean enable) {
+		isInReadOnlyMode = enable;
+		return this;
+	}
+	
+	/**
+	 * Creates a persisting cache
+	 * @param maxNumBytes maximum capacity in bytes
+	 * @throws IOException if unable to create a temporary directory on the local file system
+	 */
+	public PersistentLRUCache(long maxNumBytes) throws IOException {
+		_maxNumBytes = maxNumBytes;
+		File tmp = Files.createTempDirectory("systemml_" + Math.abs(_rand.nextLong())).toFile();
+		tmp.deleteOnExit();
+		_prefixFilePath = tmp.getAbsolutePath();
+	}
+	public ValueWrapper put(String key, double[] value) throws FileNotFoundException, IOException {
+		return putImplm(key, new ValueWrapper(new DataWrapper(key, value, this)), value.length*Double.BYTES);
+	}
+	public ValueWrapper put(String key, float[] value) throws FileNotFoundException, IOException {
+		return putImplm(key, new ValueWrapper(new DataWrapper(key, value, this)), value.length*Float.BYTES);
+	}
+	public ValueWrapper put(String key, MatrixBlock value) throws FileNotFoundException, IOException {
+		return putImplm(key, new ValueWrapper(new DataWrapper(key, value, this)), value.getInMemorySize());
+	}
+	
+	private ValueWrapper putImplm(String key, ValueWrapper value, long sizeInBytes) throws FileNotFoundException, IOException {
+		ValueWrapper prev = null;
+		if(containsKey(key))
+			prev = remove(key);
+		ensureCapacity(sizeInBytes);
+		super.put(key, value);
+		return prev;
+	}
+	
+	@Override
+	public ValueWrapper remove(Object key) {
+		ValueWrapper prev = super.remove(key);
+		if(prev != null) {
+			long size = prev.getSize();
+			if(size > 0)
+				_currentNumBytes.addAndGet(-size);
+			prev.remove();
+		}
+		return prev;
+	}
+	
+	@Override
+	public ValueWrapper put(String key, ValueWrapper value) {
+		// super.put(key, value);
+		throw new DMLRuntimeException("Incorrect usage: Value should be of type double[], float[], or MatrixBlock");
+	}
+	
+	@Override
+	public void putAll(Map<? extends String, ? extends ValueWrapper> m) {
+		// super.putAll(m);
+		throw new DMLRuntimeException("Incorrect usage: Value should be of type double[], float[], or MatrixBlock");
+	}
+	
+	@Override
+	public ValueWrapper get(Object key) {
+		// return super.get(key);
+		throw new DMLRuntimeException("Incorrect usage: Use getAsDoubleArray, getAsFloatArray or getAsMatrixBlock instead.");
+	}
+	
+	void makeRecent(String key) {
+		// super.get(key); // didn't work.
+		ValueWrapper value = super.get(key);
+		super.remove(key);
+		super.put(key, value);
+	}
+	
+	@Override
+	public void clear() {
+		super.clear();
+		_currentNumBytes.set(0);
+		File tmp;
+		try {
+			tmp = Files.createTempDirectory("systemml_" + Math.abs(_rand.nextLong())).toFile();
+			tmp.deleteOnExit();
+			_prefixFilePath = tmp.getAbsolutePath();
+		} catch (IOException e) {
+			throw new RuntimeException("Error occured while creating the temp directory.", e);
+		}
+	}
+	
+	Map.Entry<String, ValueWrapper> _eldest;
+	@Override
+    protected boolean removeEldestEntry(Map.Entry<String, ValueWrapper> eldest) {
+		_eldest = eldest;
+		return false; // Never ask LinkedHashMap to remove eldest entry, instead do that in ensureCapacity.
+    }
+	
+	float [] tmp = new float[0];
+	String dummyKey = "RAND_KEY_" + Math.abs(_rand.nextLong()) + "_" + Math.abs(_rand.nextLong());
+	void ensureCapacity(long newNumBytes) throws FileNotFoundException, IOException {
+		if(newNumBytes > _maxNumBytes) {
+			throw new DMLRuntimeException("Exceeds maximum capacity. Cannot put a value of size " + newNumBytes + 
+					" bytes as max capacity is " + _maxNumBytes + " bytes.");
+		}
+		long newCapacity = _currentNumBytes.addAndGet(newNumBytes);
+		if(newCapacity > _maxNumBytes) {
+			synchronized(this) {
+				if(LOG.isDebugEnabled())
+					LOG.debug("The required capacity (" + newCapacity + ") is greater than max capacity:" + _maxNumBytes);
+				ValueWrapper dummyValue = new ValueWrapper(new DataWrapper(dummyKey, tmp, this));
+				int maxIter = size();
+				while(_currentNumBytes.get() > _maxNumBytes && maxIter > 0) {
+					super.put(dummyKey, dummyValue); // This will invoke removeEldestEntry, which will set _eldest
+					remove(dummyKey);
+					if(_eldest != null && _eldest.getKey() != dummyKey) {
+						DataWrapper data = _eldest.getValue().get();
+						if(data != null) {
+							data.write(false); // Write the eldest entry to disk if not garbage collected.
+						}
+						makeRecent(_eldest.getKey()); // Make recent.
+					}
+					maxIter--;
+				}
+			}
+		}
+	}
+	
+//	public void put(String key, MatrixObject value) {
+//		_globalMap.put(key, new ValueWrapper(new DataWrapper(key, value, this)));
+//	}
+	
+	String getFilePath(String key) {
+		return _prefixFilePath + File.separator + key;
+	}
+	
+	public double [] getAsDoubleArray(String key) throws FileNotFoundException, IOException {
+		ValueWrapper value = super.get(key);
+		if(!value.isAvailable()) {
+			// Fine-grained synchronization: only one read per key, but will allow parallel loading
+			// of distinct keys.
+			synchronized(value._lock) {
+				if(!value.isAvailable()) {
+					value.update(DataWrapper.loadDoubleArr(key, this));
+				}
+			}
+		}
+		DataWrapper ret = value.get();
+		if(ret == null)
+			throw new DMLRuntimeException("Potential race-condition with Java's garbage collector while loading the value in SimplePersistingCache.");
+		return ret._dArr;
+	}
+	
+	public float [] getAsFloatArray(String key) throws FileNotFoundException, IOException {
+		ValueWrapper value = super.get(key);
+		if(!value.isAvailable()) {
+			// Fine-grained synchronization: only one read per key, but will allow parallel loading
+			// of distinct keys.
+			synchronized(value._lock) {
+				if(!value.isAvailable()) {
+					value.update(DataWrapper.loadFloatArr(key, this));
+				}
+			}
+		}
+		DataWrapper ret = value.get();
+		if(ret == null)
+			throw new DMLRuntimeException("Potential race-condition with Java's garbage collector while loading the value in SimplePersistingCache.");
+		return ret._fArr;
+	}
+	
+	public MatrixBlock getAsMatrixBlock(String key) throws FileNotFoundException, IOException {
+		ValueWrapper value = super.get(key);
+		if(!value.isAvailable()) {
+			// Fine-grained synchronization: only one read per key, but will allow parallel loading
+			// of distinct keys.
+			synchronized(value._lock) {
+				if(!value.isAvailable()) {
+					value.update(DataWrapper.loadMatrixBlock(key, this, value._rlen, value._clen, value._nnz));
+				}
+			}
+		}
+		DataWrapper ret = value.get();
+		if(ret == null)
+			throw new DMLRuntimeException("Potential race-condition with Java's garbage collector while loading the value in SimplePersistingCache.");
+		return ret._mb;
+	}
+}
+
+// ----------------------------------------------------------------------------------------
+// Internal helper class
+class DataWrapper {
+	double [] _dArr;
+	float [] _fArr;
+	MatrixBlock _mb;
+	MatrixObject _mo;
+	final PersistentLRUCache _cache;
+	final String _key;
+	DataWrapper(String key, double [] value, PersistentLRUCache cache) {
+		_key = key;
+		_dArr = value;
+		_fArr = null;
+		_mb = null;
+		_mo = null;
+		_cache = cache;
+	}
+	DataWrapper(String key, float [] value, PersistentLRUCache cache) {
+		_key = key;
+		_dArr = null;
+		_fArr = value;
+		_mb = null;
+		_mo = null;
+		_cache = cache;
+	}
+	DataWrapper(String key, MatrixBlock value, PersistentLRUCache cache) {
+		_key = key;
+		_dArr = null;
+		_fArr = null;
+		_mb = value;
+		_mo = null;
+		_cache = cache;
+	}
+	DataWrapper(String key, MatrixObject value, PersistentLRUCache cache) {
+		_key = key;
+		_dArr = null;
+		_fArr = null;
+		_mb = null;
+		_mo = value;
+		_cache = cache;
+	}
+	@Override
+	protected void finalize() throws Throwable {
+		super.finalize();
+		write(true);
+	}
+	
+	public synchronized void write(boolean isBeingGarbageCollected) throws FileNotFoundException, IOException {
+		if(_key.equals(_cache.dummyKey))
+			return;
+		_cache.makeRecent(_key); // Make it recent.
+		
+		if(_dArr != null || _fArr != null || _mb != null || _mo != null) {
+			_cache._currentNumBytes.addAndGet(-getSize());
+		}
+		
+		if(!_cache.isInReadOnlyMode) {
+			String debugSuffix = null;
+			if(PersistentLRUCache.LOG.isDebugEnabled()) {
+				if(isBeingGarbageCollected)
+					debugSuffix = " (is being garbage collected).";
+				else
+					debugSuffix = " (capacity exceeded).";
+			}
+			
+			if(_dArr != null) {
+				try (ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream(_cache.getFilePath(_key)))) {
+					os.writeInt(_dArr.length);
+					for(int i = 0; i < _dArr.length; i++) {
+						os.writeDouble(_dArr[i]);
+					}
+				}
+				if(PersistentLRUCache.LOG.isDebugEnabled())
+					PersistentLRUCache.LOG.debug("Writing value (double[] of size " + getSize() + " bytes) for the key " + _key + " to disk" + debugSuffix);
+			}
+			else if(_fArr != null) {
+				try (ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream(_cache.getFilePath(_key)))) {
+					os.writeInt(_fArr.length);
+					for(int i = 0; i < _fArr.length; i++) {
+						os.writeFloat(_fArr[i]);
+					}
+				}
+				if(PersistentLRUCache.LOG.isDebugEnabled())
+					PersistentLRUCache.LOG.debug("Writing value (float[] of size " + getSize() + " bytes) for the key " + _key + " to disk" + debugSuffix);
+			}
+			else if(_mb != null) {
+				try(FastBufferedDataOutputStream os = new FastBufferedDataOutputStream(new ObjectOutputStream(new FileOutputStream(_cache.getFilePath(_key))))) {
+					os.writeLong(_mb.getInMemorySize());
+					_mb.write(os);
+				}
+				if(PersistentLRUCache.LOG.isDebugEnabled())
+					PersistentLRUCache.LOG.debug("Writing value (MatrixBlock of size " + getSize() + " bytes) for the key " + _key + " to disk" + debugSuffix);
+			}
+			else if(_mo != null) {
+				throw new DMLRuntimeException("Not implemented");
+			}
+			else {
+				if(PersistentLRUCache.LOG.isDebugEnabled())
+					PersistentLRUCache.LOG.debug("Skipping writing of the key " + _key + " to disk as the value is already written" + debugSuffix);
+			}
+		}
+		_dArr = null; _fArr = null; _mb = null; _mo = null;
+	}
+	
+	static DataWrapper loadDoubleArr(String key, PersistentLRUCache cache) throws FileNotFoundException, IOException {
+		if(cache.isInReadOnlyMode)
+			throw new IOException("Read-only mode is only supported for MatrixBlock.");
+		if(PersistentLRUCache.LOG.isDebugEnabled())
+			PersistentLRUCache.LOG.debug("Loading double array the key " + key + " from the disk.");
+		double [] ret;
+		try (ObjectInputStream is = new ObjectInputStream(new FileInputStream(cache.getFilePath(key)))) {
+			int size = is.readInt();
+			cache.ensureCapacity(size*Double.BYTES);
+			ret = new double[size];
+			for(int i = 0; i < size; i++) {
+				ret[i] = is.readDouble();
+			}
+		}
+		return new DataWrapper(key, ret, cache);
+	}
+	
+	static DataWrapper loadFloatArr(String key, PersistentLRUCache cache) throws FileNotFoundException, IOException {
+		if(cache.isInReadOnlyMode)
+			throw new IOException("Read-only mode is only supported for MatrixBlock.");
+		if(PersistentLRUCache.LOG.isDebugEnabled())
+			PersistentLRUCache.LOG.debug("Loading float array the key " + key + " from the disk.");
+		float [] ret;
+		try (ObjectInputStream is = new ObjectInputStream(new FileInputStream(cache.getFilePath(key)))) {
+			int size = is.readInt();
+			cache.ensureCapacity(size*Float.BYTES);
+			ret = new float[size];
+			for(int i = 0; i < size; i++) {
+				ret[i] = is.readFloat();
+			}
+		}
+		return new DataWrapper(key, ret, cache);
+	}
+	
+	static DataWrapper loadMatrixBlock(String key, 
+			PersistentLRUCache cache, long rlen, long clen, long nnz) throws FileNotFoundException, IOException {
+		if(PersistentLRUCache.LOG.isDebugEnabled())
+			PersistentLRUCache.LOG.debug("Loading matrix block array the key " + key + " from the disk.");
+		MatrixBlock ret = null;
+		if(cache.isInReadOnlyMode) {
+			// Read from the filesystem in the read-only mode assuming binary-blocked format.
+			// TODO: Read the meta-data file and remove the format requirement. 
+			ret = DataConverter.readMatrixFromHDFS(key, 
+					org.apache.sysml.runtime.matrix.data.InputInfo.BinaryBlockInputInfo, rlen, clen,
+					ConfigurationManager.getBlocksize(), ConfigurationManager.getBlocksize(), nnz, 
+					new org.apache.sysml.runtime.io.FileFormatProperties());
+		}
+		else {
+			try (FastBufferedDataInputStream is = new FastBufferedDataInputStream(new ObjectInputStream(new FileInputStream(cache.getFilePath(key))))) {
+				long size = is.readLong();
+				cache.ensureCapacity(size);
+				ret = new MatrixBlock();
+				ret.readFields(is);
+			}
+		}
+		return new DataWrapper(key, ret, cache);
+	}
+	
+	void remove() {
+		File file = new File(_cache.getFilePath(_key));
+		if(file.exists()) {
+			file.delete();
+		}
+	}
+	
+	long getSize() {
+		if(_dArr != null)
+			return _dArr.length*Double.BYTES;
+		else if(_fArr != null)
+			return _fArr.length*Float.BYTES;
+		else if(_fArr != null)
+			return _mb.getInMemorySize();
+		else
+			throw new DMLRuntimeException("Not implemented");
+	}
+	
+}
+
+// Internal helper class
+class ValueWrapper {
+	final Object _lock;
+	private SoftReference<DataWrapper> _ref;
+	long _rlen;
+	long _clen;
+	long _nnz;
+	
+	ValueWrapper(DataWrapper _data) {
+		_lock = new Object();
+		_ref = new SoftReference<>(_data);
+		if(_data._mb != null) {
+			_rlen = _data._mb.getNumRows();
+			_clen = _data._mb.getNumColumns();
+			_nnz = _data._mb.getNonZeros();
+		}
+	}
+	void update(DataWrapper _data) {
+		_ref = new SoftReference<>(_data);
+		if(_data._mb != null) {
+			_rlen = _data._mb.getNumRows();
+			_clen = _data._mb.getNumColumns();
+			_nnz = _data._mb.getNonZeros();
+		}
+	}
+	boolean isAvailable() {
+		return _ref.get() != null;
+	}
+	DataWrapper get() {
+		return _ref.get();
+	}
+	long getSize() {
+		DataWrapper data = _ref.get();
+		if(data != null) 
+			return data.getSize();
+		else
+			return 0;
+	}
+	void remove() {
+		DataWrapper data = _ref.get();
+		if(data != null) {
+			data.remove();
+		}
+	}
+}
+