You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by ni...@apache.org on 2018/09/20 17:45:50 UTC

systemml git commit: [SYSTEMML-445] Write to disk when the cache is used in the write-mode

Repository: systemml
Updated Branches:
  refs/heads/master 3fbfbaecb -> 69f2d377c


[SYSTEMML-445] Write to disk when the cache is used in the write-mode

- This avoids the need to depend on finalize to perform writing.

Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/69f2d377
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/69f2d377
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/69f2d377

Branch: refs/heads/master
Commit: 69f2d377c456f9baea1e248818d544b54ee00e6f
Parents: 3fbfbae
Author: Niketan Pansare <np...@us.ibm.com>
Authored: Thu Sep 20 10:44:27 2018 -0700
Committer: Niketan Pansare <np...@us.ibm.com>
Committed: Thu Sep 20 10:44:27 2018 -0700

----------------------------------------------------------------------
 .../apache/sysml/utils/PersistentLRUCache.java  | 100 ++++++++++++-------
 1 file changed, 64 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/69f2d377/src/main/java/org/apache/sysml/utils/PersistentLRUCache.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/utils/PersistentLRUCache.java b/src/main/java/org/apache/sysml/utils/PersistentLRUCache.java
index bf356bb..71a1e28 100644
--- a/src/main/java/org/apache/sysml/utils/PersistentLRUCache.java
+++ b/src/main/java/org/apache/sysml/utils/PersistentLRUCache.java
@@ -86,7 +86,7 @@ public class PersistentLRUCache extends LinkedHashMap<String, ValueWrapper> {
 	private String _prefixFilePath;
 	final AtomicLong _currentNumBytes = new AtomicLong();
 	private final long _maxNumBytes;
-	Random _rand = new Random();
+	private static final Random _rand = new Random();
 	boolean isInReadOnlyMode;
 	HashSet<String> persistedKeys = new HashSet<>();
 	
@@ -101,6 +101,9 @@ public class PersistentLRUCache extends LinkedHashMap<String, ValueWrapper> {
 		for(long i = 0; i < numIter; ++i) {
 			LOG.debug("Putting a double array of size 50MB.");
 			cache.put("file_" + i, new double[numDoubleIn50MB]);
+			try {
+				Thread.sleep(100);
+			} catch (InterruptedException e) {}
 		}
 		cache.clear();
 	}
@@ -127,13 +130,13 @@ public class PersistentLRUCache extends LinkedHashMap<String, ValueWrapper> {
 		_prefixFilePath = tmp.getAbsolutePath();
 	}
 	public ValueWrapper put(String key, double[] value) throws FileNotFoundException, IOException {
-		return putImplm(key, new ValueWrapper(new DataWrapper(key, value, this)), value.length*Double.BYTES);
+		return putImplm(key, new ValueWrapper(new DataWrapper(key, value, this), isInReadOnlyMode), value.length*Double.BYTES);
 	}
 	public ValueWrapper put(String key, float[] value) throws FileNotFoundException, IOException {
-		return putImplm(key, new ValueWrapper(new DataWrapper(key, value, this)), value.length*Float.BYTES);
+		return putImplm(key, new ValueWrapper(new DataWrapper(key, value, this), isInReadOnlyMode), value.length*Float.BYTES);
 	}
 	public ValueWrapper put(String key, MatrixBlock value) throws FileNotFoundException, IOException {
-		return putImplm(key, new ValueWrapper(new DataWrapper(key, value, this)), value.getInMemorySize());
+		return putImplm(key, new ValueWrapper(new DataWrapper(key, value, this), isInReadOnlyMode), value.getInMemorySize());
 	}
 	
 	private ValueWrapper putImplm(String key, ValueWrapper value, long sizeInBytes) throws FileNotFoundException, IOException {
@@ -206,7 +209,7 @@ public class PersistentLRUCache extends LinkedHashMap<String, ValueWrapper> {
     }
 	
 	float [] tmp = new float[0];
-	String dummyKey = "RAND_KEY_" + Math.abs(_rand.nextLong()) + "_" + Math.abs(_rand.nextLong());
+	static String dummyKey = "RAND_KEY_" + Math.abs(_rand.nextLong()) + "_" + Math.abs(_rand.nextLong());
 	void ensureCapacity(long newNumBytes) throws FileNotFoundException, IOException {
 		if(newNumBytes > _maxNumBytes) {
 			throw new DMLRuntimeException("Exceeds maximum capacity. Cannot put a value of size " + newNumBytes + 
@@ -217,7 +220,7 @@ public class PersistentLRUCache extends LinkedHashMap<String, ValueWrapper> {
 			synchronized(this) {
 				if(LOG.isDebugEnabled())
 					LOG.debug("The required capacity (" + newCapacity + ") is greater than max capacity:" + _maxNumBytes);
-				ValueWrapper dummyValue = new ValueWrapper(new DataWrapper(dummyKey, tmp, this));
+				ValueWrapper dummyValue = new ValueWrapper(new DataWrapper(dummyKey, tmp, this), isInReadOnlyMode);
 				int maxIter = size();
 				while(_currentNumBytes.get() > _maxNumBytes && maxIter > 0) {
 					super.put(dummyKey, dummyValue); // This will invoke removeEldestEntry, which will set _eldest
@@ -348,17 +351,13 @@ class DataWrapper {
 		_mo = value;
 		_cache = cache;
 	}
-	@Override
-	protected void finalize() throws Throwable {
-		super.finalize();
-		write(true);
-	}
 	
-	public synchronized void write(boolean isBeingGarbageCollected) throws FileNotFoundException, IOException {
-		if(_key.equals(_cache.dummyKey))
+	public synchronized void write(boolean forceAggresiveWrites) throws FileNotFoundException, IOException {
+		if(_key.equals(PersistentLRUCache.dummyKey))
 			return;
-		_cache.makeRecent(_key); // Make it recent.
 		
+		// Prepare for writing
+		_cache.makeRecent(_key); // Make it recent.
 		if(_dArr != null || _fArr != null || _mb != null || _mo != null) {
 			_cache._currentNumBytes.addAndGet(-getSize());
 		}
@@ -366,14 +365,16 @@ class DataWrapper {
 		if(!_cache.isInReadOnlyMode) {
 			String debugSuffix = null;
 			if(PersistentLRUCache.LOG.isDebugEnabled()) {
-				if(isBeingGarbageCollected)
-					debugSuffix = " (is being garbage collected).";
+				if(forceAggresiveWrites)
+					debugSuffix = " (aggressively written).";
 				else
 					debugSuffix = " (capacity exceeded).";
 			}
 			
 			if(_dArr != null) {
-				try (ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream(_cache.getFilePath(_key)))) {
+				File file = new File(_cache.getFilePath(_key));
+				file.deleteOnExit();
+				try (ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream(file))) {
 					os.writeInt(_dArr.length);
 					for(int i = 0; i < _dArr.length; i++) {
 						os.writeDouble(_dArr[i]);
@@ -384,7 +385,9 @@ class DataWrapper {
 					PersistentLRUCache.LOG.debug("Writing value (double[] of size " + getSize() + " bytes) for the key " + _key + " to disk" + debugSuffix);
 			}
 			else if(_fArr != null) {
-				try (ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream(_cache.getFilePath(_key)))) {
+				File file = new File(_cache.getFilePath(_key));
+				file.deleteOnExit();
+				try (ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream(file))) {
 					os.writeInt(_fArr.length);
 					for(int i = 0; i < _fArr.length; i++) {
 						os.writeFloat(_fArr[i]);
@@ -395,7 +398,9 @@ class DataWrapper {
 					PersistentLRUCache.LOG.debug("Writing value (float[] of size " + getSize() + " bytes) for the key " + _key + " to disk" + debugSuffix);
 			}
 			else if(_mb != null) {
-				try(FastBufferedDataOutputStream os = new FastBufferedDataOutputStream(new ObjectOutputStream(new FileOutputStream(_cache.getFilePath(_key))))) {
+				File file = new File(_cache.getFilePath(_key));
+				file.deleteOnExit();
+				try(FastBufferedDataOutputStream os = new FastBufferedDataOutputStream(new ObjectOutputStream(new FileOutputStream(file)))) {
 					os.writeLong(_mb.getInMemorySize());
 					_mb.write(os);
 				}
@@ -508,44 +513,67 @@ class DataWrapper {
 // Internal helper class
 class ValueWrapper {
 	final Object _lock;
-	private SoftReference<DataWrapper> _ref;
+	final boolean _isInReadOnlyMode;
+	private SoftReference<DataWrapper> _softRef;
 	long _rlen;
 	long _clen;
 	long _nnz;
 	
-	ValueWrapper(DataWrapper _data) {
+	// This is only used in write-mode until the writing to the disk is completed.
+	// It also prevents the _softRef from being garbage collected while it is written.
+	volatile DataWrapper _strongRef; 
+	
+	ValueWrapper(DataWrapper data, boolean isInReadOnlyMode) {
 		_lock = new Object();
-		_ref = new SoftReference<>(_data);
-		if(_data._mb != null) {
-			_rlen = _data._mb.getNumRows();
-			_clen = _data._mb.getNumColumns();
-			_nnz = _data._mb.getNonZeros();
+		_isInReadOnlyMode = isInReadOnlyMode;
+		boolean isDummyValue = (data._key == PersistentLRUCache.dummyKey);
+		if(!_isInReadOnlyMode && !isDummyValue) {
+			// Aggressive write to disk when the cache is used in the write-mode.
+			// This avoids the need to depend on finalize to perform writing.
+			_strongRef = data;
+			Thread t = new Thread() {
+			    public void run() {
+			    	try {
+						_strongRef.write(true);
+						_strongRef = null; // Reset the strong reference after aggresive writing
+					} catch (IOException e) {
+						throw new DMLRuntimeException("Error occured while aggressively writing the value to disk.", e);
+					}
+			    }
+			};
+			t.start();
+		}
+		_softRef = new SoftReference<>(data);
+		if(data._mb != null) {
+			_rlen = data._mb.getNumRows();
+			_clen = data._mb.getNumColumns();
+			_nnz = data._mb.getNonZeros();
 		}
 	}
-	void update(DataWrapper _data) {
-		_ref = new SoftReference<>(_data);
-		if(_data._mb != null) {
-			_rlen = _data._mb.getNumRows();
-			_clen = _data._mb.getNumColumns();
-			_nnz = _data._mb.getNonZeros();
+	void update(DataWrapper data) {
+		_softRef = new SoftReference<>(data);
+		if(data._mb != null) {
+			_rlen = data._mb.getNumRows();
+			_clen = data._mb.getNumColumns();
+			_nnz = data._mb.getNonZeros();
 		}
 	}
 	boolean isAvailable() {
-		DataWrapper data = _ref.get();
+		DataWrapper data = _softRef.get();
 		return data != null && data.isAvailable();
 	}
 	DataWrapper get() {
-		return _ref.get();
+		return _softRef.get();
 	}
 	long getSize() {
-		DataWrapper data = _ref.get();
+		DataWrapper data = _softRef.get();
 		if(data != null) 
 			return data.getSize();
 		else
 			return 0;
 	}
 	void remove() {
-		DataWrapper data = _ref.get();
+		DataWrapper data = _softRef.get();
 		if(data != null) {
 			data.remove();
 		}