You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by ni...@apache.org on 2018/09/14 19:21:10 UTC
systemml git commit: [MINOR] Add two helper utilities.
Repository: systemml
Updated Branches:
refs/heads/master e2dc85688 -> 4d8df33cc
[MINOR] Add two helper utilities.
- First, PersistentLRUCache to cache double[], float[] and MatrixBlock without requiring the user to worry about OOM.
- Second, reblockAndWrite method in MLContextUtil class to reblock the output of a DML script as rectangular blocked RDDs.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/4d8df33c
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/4d8df33c
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/4d8df33c
Branch: refs/heads/master
Commit: 4d8df33cc53583e71c4a488577270461e6f712e2
Parents: e2dc856
Author: Niketan Pansare <np...@us.ibm.com>
Authored: Fri Sep 14 12:17:11 2018 -0700
Committer: Niketan Pansare <np...@us.ibm.com>
Committed: Fri Sep 14 12:17:11 2018 -0700
----------------------------------------------------------------------
.../sysml/api/mlcontext/MLContextUtil.java | 31 ++
.../apache/sysml/utils/PersistentLRUCache.java | 518 +++++++++++++++++++
2 files changed, 549 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/systemml/blob/4d8df33c/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java b/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
index dcf4595..c0ce6c9 100644
--- a/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
+++ b/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
@@ -21,6 +21,7 @@ package org.apache.sysml.api.mlcontext;
import java.io.File;
import java.io.FileNotFoundException;
+import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Date;
@@ -74,6 +75,7 @@ import org.apache.sysml.runtime.instructions.cp.DoubleObject;
import org.apache.sysml.runtime.instructions.cp.IntObject;
import org.apache.sysml.runtime.instructions.cp.StringObject;
import org.apache.sysml.runtime.instructions.cp.VariableCPInstruction;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
import org.apache.sysml.runtime.matrix.data.FrameBlock;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
@@ -118,6 +120,35 @@ public final class MLContextUtil {
@SuppressWarnings("rawtypes")
public static final Class[] ALL_SUPPORTED_DATA_TYPES = (Class[]) ArrayUtils.addAll(BASIC_DATA_TYPES,
COMPLEX_DATA_TYPES);
+
+ /**
+ * Utility method to write an output as rectangular blocked RDD
+ *
+ * @param spark spark session
+ * @param dmlScript script that generates the outVariable
+ * @param outVariable variable name
+ * @param outFilePath output file path
+ * @param rowsPerBlock number of rows per block
+ * @param colsPerBlock number of columns per block
+ * @throws IOException if error occurs
+ */
+ public static void reblockAndWrite(SparkSession spark, String dmlScript, String outVariable, String outFilePath, int rowsPerBlock, int colsPerBlock) throws IOException {
+ MLContext ml = new MLContext(spark);
+ Script helloScript = org.apache.sysml.api.mlcontext.ScriptFactory.dml(dmlScript).out(outVariable);
+ MLResults res = ml.execute(helloScript);
+ JavaPairRDD<MatrixIndexes, MatrixBlock> rdd = res.getMatrix(outVariable).toBinaryBlocks();
+ MatrixCharacteristics mc = res.getMatrix(outVariable).getMatrixMetadata().asMatrixCharacteristics();
+ MatrixCharacteristics mcOut = new MatrixCharacteristics(mc);
+ mcOut.setRowsPerBlock(rowsPerBlock);
+ mcOut.setColsPerBlock(colsPerBlock);
+ JavaPairRDD<MatrixIndexes, MatrixBlock> out = org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils.mergeByKey(rdd.flatMapToPair(
+ new org.apache.sysml.runtime.instructions.spark.functions.ExtractBlockForBinaryReblock(mc, mcOut)), false);
+ out.saveAsHadoopFile(outFilePath, MatrixIndexes.class, MatrixBlock.class, org.apache.hadoop.mapred.SequenceFileOutputFormat.class);
+ org.apache.sysml.runtime.util.MapReduceTool.writeMetaDataFile(outFilePath + ".mtd",
+ org.apache.sysml.parser.Expression.ValueType.DOUBLE, mcOut,
+ org.apache.sysml.runtime.matrix.data.OutputInfo.BinaryBlockOutputInfo,
+ new org.apache.sysml.runtime.io.FileFormatProperties());
+ }
/**
* Compare two version strings (ie, "1.4.0" and "1.4.1").
http://git-wip-us.apache.org/repos/asf/systemml/blob/4d8df33c/src/main/java/org/apache/sysml/utils/PersistentLRUCache.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/utils/PersistentLRUCache.java b/src/main/java/org/apache/sysml/utils/PersistentLRUCache.java
new file mode 100644
index 0000000..83a0dcf
--- /dev/null
+++ b/src/main/java/org/apache/sysml/utils/PersistentLRUCache.java
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sysml.utils;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.ref.SoftReference;
+import java.nio.file.Files;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.log4j.Level;
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.util.DataConverter;
+import org.apache.sysml.runtime.util.FastBufferedDataInputStream;
+import org.apache.sysml.runtime.util.FastBufferedDataOutputStream;
+
+/**
+ * Simple utility to store double[], float[] and MatrixBlock in-memory.
+ * It is designed to guard against OOM by using soft reference as well as max capacity.
+ * When memory is full or if capacity is exceeded, SimplePersistingCache stores the least recently used values into the local filesystem.
+ * Assumption: GC occurs before an OutOfMemoryException, and GC requires prior finalize call.
+ *
+ * The user should use custom put and get methods:
+ * - put(String key, double[] value);
+ * - put(String key, float[] value);
+ * - put(String key, MatrixBlock value);
+ * - double [] getAsDoubleArray(String key);
+ * - float [] getAsFloatArray(String key);
+ * - MatrixBlock getAsMatrixBlock(String key);
+ *
+ * Additionally, the user can also use standard Map methods:
+ * - void clear();
+ * - boolean containsKey(String key)
+ * - remove(String key);
+ *
+ * Instead of using generic types i.e. LinkedHashMap<String, ?>, we are allowing the cache to store values of different types.
+ * ValueWrapper is a container in this case to store the actual values (i.e. double[]. float[] or MatrixBlock).
+ *
+ * The cache can be used in two modes:
+ * - Read-only mode (only applicable for MatrixBlock keys):
+ * = We delete the value when capacity is exceeded or when GC occurs.
+ * = When get is invoked on the deleted key, the key is treated as the full path and MatrixBlock is read from that path.
+ * = Note: in the current version, the metadata file is ignored and the file-format is assumed to be binary-block. We can extend this later.
+ * - General case:
+ * = We persist the values to the file system (into temporary directory) when capacity is exceeded or when GC occurs.
+ * = When get is invoked on the deleted key, the key is treated as the file name (not the absolute path) and MatrixBlock is read from that path.
+ *
+ * This class does not assume minimum capacity and hence only soft references.
+ *
+ * To test this class, please use the below command:
+ * java -cp systemml-*-standalone.jar:commons-lang3-3.8.jar org.apache.sysml.utils.PersistentLRUCache.
+ */
+public class PersistentLRUCache extends LinkedHashMap<String, ValueWrapper> {
+ static final Log LOG = LogFactory.getLog(PersistentLRUCache.class.getName());
+ private static final long serialVersionUID = -6838798881747433047L;
+ private String _prefixFilePath;
+ final AtomicLong _currentNumBytes = new AtomicLong();
+ private final long _maxNumBytes;
+ Random _rand = new Random();
+ boolean isInReadOnlyMode;
+
+ public static void main(String [] args) throws IOException {
+ org.apache.log4j.Logger.getRootLogger().setLevel(Level.DEBUG);
+ double numBytesInMB = 1e+7;
+ int numDoubleInMB = (int) (numBytesInMB / 8);
+ PersistentLRUCache cache = new PersistentLRUCache((long)(numBytesInMB*25));
+ for(int i = 0; i < 30; ++i) {
+ LOG.debug("Putting a double array of size 1MB.");
+ cache.put("file_" + i, new double[numDoubleInMB]);
+ }
+ cache.clear();
+ }
+
+ /**
+ * When enabled, the cache will discard the values instead of writing it to the local file system.
+ *
+ * @return this
+ */
+ public PersistentLRUCache enableReadOnlyMode(boolean enable) {
+ isInReadOnlyMode = enable;
+ return this;
+ }
+
+ /**
+ * Creates a persisting cache
+ * @param maxNumBytes maximum capacity in bytes
+ * @throws IOException if unable to create a temporary directory on the local file system
+ */
+ public PersistentLRUCache(long maxNumBytes) throws IOException {
+ _maxNumBytes = maxNumBytes;
+ File tmp = Files.createTempDirectory("systemml_" + Math.abs(_rand.nextLong())).toFile();
+ tmp.deleteOnExit();
+ _prefixFilePath = tmp.getAbsolutePath();
+ }
+ public ValueWrapper put(String key, double[] value) throws FileNotFoundException, IOException {
+ return putImplm(key, new ValueWrapper(new DataWrapper(key, value, this)), value.length*Double.BYTES);
+ }
+ public ValueWrapper put(String key, float[] value) throws FileNotFoundException, IOException {
+ return putImplm(key, new ValueWrapper(new DataWrapper(key, value, this)), value.length*Float.BYTES);
+ }
+ public ValueWrapper put(String key, MatrixBlock value) throws FileNotFoundException, IOException {
+ return putImplm(key, new ValueWrapper(new DataWrapper(key, value, this)), value.getInMemorySize());
+ }
+
+ private ValueWrapper putImplm(String key, ValueWrapper value, long sizeInBytes) throws FileNotFoundException, IOException {
+ ValueWrapper prev = null;
+ if(containsKey(key))
+ prev = remove(key);
+ ensureCapacity(sizeInBytes);
+ super.put(key, value);
+ return prev;
+ }
+
+ @Override
+ public ValueWrapper remove(Object key) {
+ ValueWrapper prev = super.remove(key);
+ if(prev != null) {
+ long size = prev.getSize();
+ if(size > 0)
+ _currentNumBytes.addAndGet(-size);
+ prev.remove();
+ }
+ return prev;
+ }
+
+ @Override
+ public ValueWrapper put(String key, ValueWrapper value) {
+ // super.put(key, value);
+ throw new DMLRuntimeException("Incorrect usage: Value should be of type double[], float[], or MatrixBlock");
+ }
+
+ @Override
+ public void putAll(Map<? extends String, ? extends ValueWrapper> m) {
+ // super.putAll(m);
+ throw new DMLRuntimeException("Incorrect usage: Value should be of type double[], float[], or MatrixBlock");
+ }
+
+ @Override
+ public ValueWrapper get(Object key) {
+ // return super.get(key);
+ throw new DMLRuntimeException("Incorrect usage: Use getAsDoubleArray, getAsFloatArray or getAsMatrixBlock instead.");
+ }
+
+ void makeRecent(String key) {
+ // super.get(key); // didn't work.
+ ValueWrapper value = super.get(key);
+ super.remove(key);
+ super.put(key, value);
+ }
+
+ @Override
+ public void clear() {
+ super.clear();
+ _currentNumBytes.set(0);
+ File tmp;
+ try {
+ tmp = Files.createTempDirectory("systemml_" + Math.abs(_rand.nextLong())).toFile();
+ tmp.deleteOnExit();
+ _prefixFilePath = tmp.getAbsolutePath();
+ } catch (IOException e) {
+ throw new RuntimeException("Error occured while creating the temp directory.", e);
+ }
+ }
+
+ Map.Entry<String, ValueWrapper> _eldest;
+ @Override
+ protected boolean removeEldestEntry(Map.Entry<String, ValueWrapper> eldest) {
+ _eldest = eldest;
+ return false; // Never ask LinkedHashMap to remove eldest entry, instead do that in ensureCapacity.
+ }
+
+ float [] tmp = new float[0];
+ String dummyKey = "RAND_KEY_" + Math.abs(_rand.nextLong()) + "_" + Math.abs(_rand.nextLong());
+ void ensureCapacity(long newNumBytes) throws FileNotFoundException, IOException {
+ if(newNumBytes > _maxNumBytes) {
+ throw new DMLRuntimeException("Exceeds maximum capacity. Cannot put a value of size " + newNumBytes +
+ " bytes as max capacity is " + _maxNumBytes + " bytes.");
+ }
+ long newCapacity = _currentNumBytes.addAndGet(newNumBytes);
+ if(newCapacity > _maxNumBytes) {
+ synchronized(this) {
+ if(LOG.isDebugEnabled())
+ LOG.debug("The required capacity (" + newCapacity + ") is greater than max capacity:" + _maxNumBytes);
+ ValueWrapper dummyValue = new ValueWrapper(new DataWrapper(dummyKey, tmp, this));
+ int maxIter = size();
+ while(_currentNumBytes.get() > _maxNumBytes && maxIter > 0) {
+ super.put(dummyKey, dummyValue); // This will invoke removeEldestEntry, which will set _eldest
+ remove(dummyKey);
+ if(_eldest != null && _eldest.getKey() != dummyKey) {
+ DataWrapper data = _eldest.getValue().get();
+ if(data != null) {
+ data.write(false); // Write the eldest entry to disk if not garbage collected.
+ }
+ makeRecent(_eldest.getKey()); // Make recent.
+ }
+ maxIter--;
+ }
+ }
+ }
+ }
+
+// public void put(String key, MatrixObject value) {
+// _globalMap.put(key, new ValueWrapper(new DataWrapper(key, value, this)));
+// }
+
+ String getFilePath(String key) {
+ return _prefixFilePath + File.separator + key;
+ }
+
+ public double [] getAsDoubleArray(String key) throws FileNotFoundException, IOException {
+ ValueWrapper value = super.get(key);
+ if(!value.isAvailable()) {
+ // Fine-grained synchronization: only one read per key, but will allow parallel loading
+ // of distinct keys.
+ synchronized(value._lock) {
+ if(!value.isAvailable()) {
+ value.update(DataWrapper.loadDoubleArr(key, this));
+ }
+ }
+ }
+ DataWrapper ret = value.get();
+ if(ret == null)
+ throw new DMLRuntimeException("Potential race-condition with Java's garbage collector while loading the value in SimplePersistingCache.");
+ return ret._dArr;
+ }
+
+ public float [] getAsFloatArray(String key) throws FileNotFoundException, IOException {
+ ValueWrapper value = super.get(key);
+ if(!value.isAvailable()) {
+ // Fine-grained synchronization: only one read per key, but will allow parallel loading
+ // of distinct keys.
+ synchronized(value._lock) {
+ if(!value.isAvailable()) {
+ value.update(DataWrapper.loadFloatArr(key, this));
+ }
+ }
+ }
+ DataWrapper ret = value.get();
+ if(ret == null)
+ throw new DMLRuntimeException("Potential race-condition with Java's garbage collector while loading the value in SimplePersistingCache.");
+ return ret._fArr;
+ }
+
+ public MatrixBlock getAsMatrixBlock(String key) throws FileNotFoundException, IOException {
+ ValueWrapper value = super.get(key);
+ if(!value.isAvailable()) {
+ // Fine-grained synchronization: only one read per key, but will allow parallel loading
+ // of distinct keys.
+ synchronized(value._lock) {
+ if(!value.isAvailable()) {
+ value.update(DataWrapper.loadMatrixBlock(key, this, value._rlen, value._clen, value._nnz));
+ }
+ }
+ }
+ DataWrapper ret = value.get();
+ if(ret == null)
+ throw new DMLRuntimeException("Potential race-condition with Java's garbage collector while loading the value in SimplePersistingCache.");
+ return ret._mb;
+ }
+}
+
+// ----------------------------------------------------------------------------------------
+// Internal helper class
+class DataWrapper {
+ double [] _dArr;
+ float [] _fArr;
+ MatrixBlock _mb;
+ MatrixObject _mo;
+ final PersistentLRUCache _cache;
+ final String _key;
+ DataWrapper(String key, double [] value, PersistentLRUCache cache) {
+ _key = key;
+ _dArr = value;
+ _fArr = null;
+ _mb = null;
+ _mo = null;
+ _cache = cache;
+ }
+ DataWrapper(String key, float [] value, PersistentLRUCache cache) {
+ _key = key;
+ _dArr = null;
+ _fArr = value;
+ _mb = null;
+ _mo = null;
+ _cache = cache;
+ }
+ DataWrapper(String key, MatrixBlock value, PersistentLRUCache cache) {
+ _key = key;
+ _dArr = null;
+ _fArr = null;
+ _mb = value;
+ _mo = null;
+ _cache = cache;
+ }
+ DataWrapper(String key, MatrixObject value, PersistentLRUCache cache) {
+ _key = key;
+ _dArr = null;
+ _fArr = null;
+ _mb = null;
+ _mo = value;
+ _cache = cache;
+ }
+ @Override
+ protected void finalize() throws Throwable {
+ super.finalize();
+ write(true);
+ }
+
+ public synchronized void write(boolean isBeingGarbageCollected) throws FileNotFoundException, IOException {
+ if(_key.equals(_cache.dummyKey))
+ return;
+ _cache.makeRecent(_key); // Make it recent.
+
+ if(_dArr != null || _fArr != null || _mb != null || _mo != null) {
+ _cache._currentNumBytes.addAndGet(-getSize());
+ }
+
+ if(!_cache.isInReadOnlyMode) {
+ String debugSuffix = null;
+ if(PersistentLRUCache.LOG.isDebugEnabled()) {
+ if(isBeingGarbageCollected)
+ debugSuffix = " (is being garbage collected).";
+ else
+ debugSuffix = " (capacity exceeded).";
+ }
+
+ if(_dArr != null) {
+ try (ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream(_cache.getFilePath(_key)))) {
+ os.writeInt(_dArr.length);
+ for(int i = 0; i < _dArr.length; i++) {
+ os.writeDouble(_dArr[i]);
+ }
+ }
+ if(PersistentLRUCache.LOG.isDebugEnabled())
+ PersistentLRUCache.LOG.debug("Writing value (double[] of size " + getSize() + " bytes) for the key " + _key + " to disk" + debugSuffix);
+ }
+ else if(_fArr != null) {
+ try (ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream(_cache.getFilePath(_key)))) {
+ os.writeInt(_fArr.length);
+ for(int i = 0; i < _fArr.length; i++) {
+ os.writeFloat(_fArr[i]);
+ }
+ }
+ if(PersistentLRUCache.LOG.isDebugEnabled())
+ PersistentLRUCache.LOG.debug("Writing value (float[] of size " + getSize() + " bytes) for the key " + _key + " to disk" + debugSuffix);
+ }
+ else if(_mb != null) {
+ try(FastBufferedDataOutputStream os = new FastBufferedDataOutputStream(new ObjectOutputStream(new FileOutputStream(_cache.getFilePath(_key))))) {
+ os.writeLong(_mb.getInMemorySize());
+ _mb.write(os);
+ }
+ if(PersistentLRUCache.LOG.isDebugEnabled())
+ PersistentLRUCache.LOG.debug("Writing value (MatrixBlock of size " + getSize() + " bytes) for the key " + _key + " to disk" + debugSuffix);
+ }
+ else if(_mo != null) {
+ throw new DMLRuntimeException("Not implemented");
+ }
+ else {
+ if(PersistentLRUCache.LOG.isDebugEnabled())
+ PersistentLRUCache.LOG.debug("Skipping writing of the key " + _key + " to disk as the value is already written" + debugSuffix);
+ }
+ }
+ _dArr = null; _fArr = null; _mb = null; _mo = null;
+ }
+
+ static DataWrapper loadDoubleArr(String key, PersistentLRUCache cache) throws FileNotFoundException, IOException {
+ if(cache.isInReadOnlyMode)
+ throw new IOException("Read-only mode is only supported for MatrixBlock.");
+ if(PersistentLRUCache.LOG.isDebugEnabled())
+ PersistentLRUCache.LOG.debug("Loading double array the key " + key + " from the disk.");
+ double [] ret;
+ try (ObjectInputStream is = new ObjectInputStream(new FileInputStream(cache.getFilePath(key)))) {
+ int size = is.readInt();
+ cache.ensureCapacity(size*Double.BYTES);
+ ret = new double[size];
+ for(int i = 0; i < size; i++) {
+ ret[i] = is.readDouble();
+ }
+ }
+ return new DataWrapper(key, ret, cache);
+ }
+
+ static DataWrapper loadFloatArr(String key, PersistentLRUCache cache) throws FileNotFoundException, IOException {
+ if(cache.isInReadOnlyMode)
+ throw new IOException("Read-only mode is only supported for MatrixBlock.");
+ if(PersistentLRUCache.LOG.isDebugEnabled())
+ PersistentLRUCache.LOG.debug("Loading float array the key " + key + " from the disk.");
+ float [] ret;
+ try (ObjectInputStream is = new ObjectInputStream(new FileInputStream(cache.getFilePath(key)))) {
+ int size = is.readInt();
+ cache.ensureCapacity(size*Float.BYTES);
+ ret = new float[size];
+ for(int i = 0; i < size; i++) {
+ ret[i] = is.readFloat();
+ }
+ }
+ return new DataWrapper(key, ret, cache);
+ }
+
+ static DataWrapper loadMatrixBlock(String key,
+ PersistentLRUCache cache, long rlen, long clen, long nnz) throws FileNotFoundException, IOException {
+ if(PersistentLRUCache.LOG.isDebugEnabled())
+ PersistentLRUCache.LOG.debug("Loading matrix block array the key " + key + " from the disk.");
+ MatrixBlock ret = null;
+ if(cache.isInReadOnlyMode) {
+ // Read from the filesystem in the read-only mode assuming binary-blocked format.
+ // TODO: Read the meta-data file and remove the format requirement.
+ ret = DataConverter.readMatrixFromHDFS(key,
+ org.apache.sysml.runtime.matrix.data.InputInfo.BinaryBlockInputInfo, rlen, clen,
+ ConfigurationManager.getBlocksize(), ConfigurationManager.getBlocksize(), nnz,
+ new org.apache.sysml.runtime.io.FileFormatProperties());
+ }
+ else {
+ try (FastBufferedDataInputStream is = new FastBufferedDataInputStream(new ObjectInputStream(new FileInputStream(cache.getFilePath(key))))) {
+ long size = is.readLong();
+ cache.ensureCapacity(size);
+ ret = new MatrixBlock();
+ ret.readFields(is);
+ }
+ }
+ return new DataWrapper(key, ret, cache);
+ }
+
+ void remove() {
+ File file = new File(_cache.getFilePath(_key));
+ if(file.exists()) {
+ file.delete();
+ }
+ }
+
+ long getSize() {
+ if(_dArr != null)
+ return _dArr.length*Double.BYTES;
+ else if(_fArr != null)
+ return _fArr.length*Float.BYTES;
+ else if(_fArr != null)
+ return _mb.getInMemorySize();
+ else
+ throw new DMLRuntimeException("Not implemented");
+ }
+
+}
+
+// Internal helper class
+class ValueWrapper {
+ final Object _lock;
+ private SoftReference<DataWrapper> _ref;
+ long _rlen;
+ long _clen;
+ long _nnz;
+
+ ValueWrapper(DataWrapper _data) {
+ _lock = new Object();
+ _ref = new SoftReference<>(_data);
+ if(_data._mb != null) {
+ _rlen = _data._mb.getNumRows();
+ _clen = _data._mb.getNumColumns();
+ _nnz = _data._mb.getNonZeros();
+ }
+ }
+ void update(DataWrapper _data) {
+ _ref = new SoftReference<>(_data);
+ if(_data._mb != null) {
+ _rlen = _data._mb.getNumRows();
+ _clen = _data._mb.getNumColumns();
+ _nnz = _data._mb.getNonZeros();
+ }
+ }
+ boolean isAvailable() {
+ return _ref.get() != null;
+ }
+ DataWrapper get() {
+ return _ref.get();
+ }
+ long getSize() {
+ DataWrapper data = _ref.get();
+ if(data != null)
+ return data.getSize();
+ else
+ return 0;
+ }
+ void remove() {
+ DataWrapper data = _ref.get();
+ if(data != null) {
+ data.remove();
+ }
+ }
+}
+