You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by ni...@apache.org on 2018/09/20 17:45:50 UTC
systemml git commit: [SYSTEMML-445] Write to disk when the cache is
used in the write-mode
Repository: systemml
Updated Branches:
refs/heads/master 3fbfbaecb -> 69f2d377c
[SYSTEMML-445] Write to disk when the cache is used in the write-mode
- This avoids the need to depend on finalize to perform writing.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/69f2d377
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/69f2d377
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/69f2d377
Branch: refs/heads/master
Commit: 69f2d377c456f9baea1e248818d544b54ee00e6f
Parents: 3fbfbae
Author: Niketan Pansare <np...@us.ibm.com>
Authored: Thu Sep 20 10:44:27 2018 -0700
Committer: Niketan Pansare <np...@us.ibm.com>
Committed: Thu Sep 20 10:44:27 2018 -0700
----------------------------------------------------------------------
.../apache/sysml/utils/PersistentLRUCache.java | 100 ++++++++++++-------
1 file changed, 64 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/systemml/blob/69f2d377/src/main/java/org/apache/sysml/utils/PersistentLRUCache.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/utils/PersistentLRUCache.java b/src/main/java/org/apache/sysml/utils/PersistentLRUCache.java
index bf356bb..71a1e28 100644
--- a/src/main/java/org/apache/sysml/utils/PersistentLRUCache.java
+++ b/src/main/java/org/apache/sysml/utils/PersistentLRUCache.java
@@ -86,7 +86,7 @@ public class PersistentLRUCache extends LinkedHashMap<String, ValueWrapper> {
private String _prefixFilePath;
final AtomicLong _currentNumBytes = new AtomicLong();
private final long _maxNumBytes;
- Random _rand = new Random();
+ private static final Random _rand = new Random();
boolean isInReadOnlyMode;
HashSet<String> persistedKeys = new HashSet<>();
@@ -101,6 +101,9 @@ public class PersistentLRUCache extends LinkedHashMap<String, ValueWrapper> {
for(long i = 0; i < numIter; ++i) {
LOG.debug("Putting a double array of size 50MB.");
cache.put("file_" + i, new double[numDoubleIn50MB]);
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {}
}
cache.clear();
}
@@ -127,13 +130,13 @@ public class PersistentLRUCache extends LinkedHashMap<String, ValueWrapper> {
_prefixFilePath = tmp.getAbsolutePath();
}
public ValueWrapper put(String key, double[] value) throws FileNotFoundException, IOException {
- return putImplm(key, new ValueWrapper(new DataWrapper(key, value, this)), value.length*Double.BYTES);
+ return putImplm(key, new ValueWrapper(new DataWrapper(key, value, this), isInReadOnlyMode), value.length*Double.BYTES);
}
public ValueWrapper put(String key, float[] value) throws FileNotFoundException, IOException {
- return putImplm(key, new ValueWrapper(new DataWrapper(key, value, this)), value.length*Float.BYTES);
+ return putImplm(key, new ValueWrapper(new DataWrapper(key, value, this), isInReadOnlyMode), value.length*Float.BYTES);
}
public ValueWrapper put(String key, MatrixBlock value) throws FileNotFoundException, IOException {
- return putImplm(key, new ValueWrapper(new DataWrapper(key, value, this)), value.getInMemorySize());
+ return putImplm(key, new ValueWrapper(new DataWrapper(key, value, this), isInReadOnlyMode), value.getInMemorySize());
}
private ValueWrapper putImplm(String key, ValueWrapper value, long sizeInBytes) throws FileNotFoundException, IOException {
@@ -206,7 +209,7 @@ public class PersistentLRUCache extends LinkedHashMap<String, ValueWrapper> {
}
float [] tmp = new float[0];
- String dummyKey = "RAND_KEY_" + Math.abs(_rand.nextLong()) + "_" + Math.abs(_rand.nextLong());
+ static String dummyKey = "RAND_KEY_" + Math.abs(_rand.nextLong()) + "_" + Math.abs(_rand.nextLong());
void ensureCapacity(long newNumBytes) throws FileNotFoundException, IOException {
if(newNumBytes > _maxNumBytes) {
throw new DMLRuntimeException("Exceeds maximum capacity. Cannot put a value of size " + newNumBytes +
@@ -217,7 +220,7 @@ public class PersistentLRUCache extends LinkedHashMap<String, ValueWrapper> {
synchronized(this) {
if(LOG.isDebugEnabled())
LOG.debug("The required capacity (" + newCapacity + ") is greater than max capacity:" + _maxNumBytes);
- ValueWrapper dummyValue = new ValueWrapper(new DataWrapper(dummyKey, tmp, this));
+ ValueWrapper dummyValue = new ValueWrapper(new DataWrapper(dummyKey, tmp, this), isInReadOnlyMode);
int maxIter = size();
while(_currentNumBytes.get() > _maxNumBytes && maxIter > 0) {
super.put(dummyKey, dummyValue); // This will invoke removeEldestEntry, which will set _eldest
@@ -348,17 +351,13 @@ class DataWrapper {
_mo = value;
_cache = cache;
}
- @Override
- protected void finalize() throws Throwable {
- super.finalize();
- write(true);
- }
- public synchronized void write(boolean isBeingGarbageCollected) throws FileNotFoundException, IOException {
- if(_key.equals(_cache.dummyKey))
+ public synchronized void write(boolean forceAggresiveWrites) throws FileNotFoundException, IOException {
+ if(_key.equals(PersistentLRUCache.dummyKey))
return;
- _cache.makeRecent(_key); // Make it recent.
+ // Prepare for writing
+ _cache.makeRecent(_key); // Make it recent.
if(_dArr != null || _fArr != null || _mb != null || _mo != null) {
_cache._currentNumBytes.addAndGet(-getSize());
}
@@ -366,14 +365,16 @@ class DataWrapper {
if(!_cache.isInReadOnlyMode) {
String debugSuffix = null;
if(PersistentLRUCache.LOG.isDebugEnabled()) {
- if(isBeingGarbageCollected)
- debugSuffix = " (is being garbage collected).";
+ if(forceAggresiveWrites)
+ debugSuffix = " (aggressively written).";
else
debugSuffix = " (capacity exceeded).";
}
if(_dArr != null) {
- try (ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream(_cache.getFilePath(_key)))) {
+ File file = new File(_cache.getFilePath(_key));
+ file.deleteOnExit();
+ try (ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream(file))) {
os.writeInt(_dArr.length);
for(int i = 0; i < _dArr.length; i++) {
os.writeDouble(_dArr[i]);
@@ -384,7 +385,9 @@ class DataWrapper {
PersistentLRUCache.LOG.debug("Writing value (double[] of size " + getSize() + " bytes) for the key " + _key + " to disk" + debugSuffix);
}
else if(_fArr != null) {
- try (ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream(_cache.getFilePath(_key)))) {
+ File file = new File(_cache.getFilePath(_key));
+ file.deleteOnExit();
+ try (ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream(file))) {
os.writeInt(_fArr.length);
for(int i = 0; i < _fArr.length; i++) {
os.writeFloat(_fArr[i]);
@@ -395,7 +398,9 @@ class DataWrapper {
PersistentLRUCache.LOG.debug("Writing value (float[] of size " + getSize() + " bytes) for the key " + _key + " to disk" + debugSuffix);
}
else if(_mb != null) {
- try(FastBufferedDataOutputStream os = new FastBufferedDataOutputStream(new ObjectOutputStream(new FileOutputStream(_cache.getFilePath(_key))))) {
+ File file = new File(_cache.getFilePath(_key));
+ file.deleteOnExit();
+ try(FastBufferedDataOutputStream os = new FastBufferedDataOutputStream(new ObjectOutputStream(new FileOutputStream(file)))) {
os.writeLong(_mb.getInMemorySize());
_mb.write(os);
}
@@ -508,44 +513,67 @@ class DataWrapper {
// Internal helper class
class ValueWrapper {
final Object _lock;
- private SoftReference<DataWrapper> _ref;
+ final boolean _isInReadOnlyMode;
+ private SoftReference<DataWrapper> _softRef;
long _rlen;
long _clen;
long _nnz;
- ValueWrapper(DataWrapper _data) {
+ // This is only used in write-mode until the writing to the disk is completed.
+ // It also prevents the _softRef from being garbage collected while it is written.
+ volatile DataWrapper _strongRef;
+
+ ValueWrapper(DataWrapper data, boolean isInReadOnlyMode) {
_lock = new Object();
- _ref = new SoftReference<>(_data);
- if(_data._mb != null) {
- _rlen = _data._mb.getNumRows();
- _clen = _data._mb.getNumColumns();
- _nnz = _data._mb.getNonZeros();
+ _isInReadOnlyMode = isInReadOnlyMode;
+ boolean isDummyValue = (data._key == PersistentLRUCache.dummyKey);
+ if(!_isInReadOnlyMode && !isDummyValue) {
+ // Aggressive write to disk when the cache is used in the write-mode.
+ // This avoids the need to depend on finalize to perform writing.
+ _strongRef = data;
+ Thread t = new Thread() {
+ public void run() {
+ try {
+ _strongRef.write(true);
+ _strongRef = null; // Reset the strong reference after aggresive writing
+ } catch (IOException e) {
+ throw new DMLRuntimeException("Error occured while aggressively writing the value to disk.", e);
+ }
+ }
+ };
+ t.start();
+ }
+ _softRef = new SoftReference<>(data);
+ if(data._mb != null) {
+ _rlen = data._mb.getNumRows();
+ _clen = data._mb.getNumColumns();
+ _nnz = data._mb.getNonZeros();
}
}
- void update(DataWrapper _data) {
- _ref = new SoftReference<>(_data);
- if(_data._mb != null) {
- _rlen = _data._mb.getNumRows();
- _clen = _data._mb.getNumColumns();
- _nnz = _data._mb.getNonZeros();
+ void update(DataWrapper data) {
+ _softRef = new SoftReference<>(data);
+ if(data._mb != null) {
+ _rlen = data._mb.getNumRows();
+ _clen = data._mb.getNumColumns();
+ _nnz = data._mb.getNonZeros();
}
}
boolean isAvailable() {
- DataWrapper data = _ref.get();
+ DataWrapper data = _softRef.get();
return data != null && data.isAvailable();
}
DataWrapper get() {
- return _ref.get();
+ return _softRef.get();
}
long getSize() {
- DataWrapper data = _ref.get();
+ DataWrapper data = _softRef.get();
if(data != null)
return data.getSize();
else
return 0;
}
void remove() {
- DataWrapper data = _ref.get();
+ DataWrapper data = _softRef.get();
if(data != null) {
data.remove();
}