You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2022/09/28 19:17:41 UTC

[systemds] branch main updated: [SYSTEMDS-3439] Fix missing config for federated read cache

This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 952c1d0896 [SYSTEMDS-3439] Fix missing config for federated read cache
952c1d0896 is described below

commit 952c1d08968412edad86d84fca0f0ab2a2871664
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Wed Sep 28 20:57:18 2022 +0200

    [SYSTEMDS-3439] Fix missing config for federated read cache
    
    With the federated read cache for multi-tenant and cross-session reuse,
    any file updates are invisible to the standing federated worker. We
    preserve the assumption of running individual coordinator sessions
    across a consistent snapshot of data, but allow the worker to be
    configured with or without read cache.
---
 conf/SystemDS-config.xml.template                                | 3 +++
 src/main/java/org/apache/sysds/conf/ConfigurationManager.java    | 4 ++++
 src/main/java/org/apache/sysds/conf/DMLConfig.java               | 2 ++
 .../runtime/controlprogram/federated/FederatedWorkerHandler.java | 9 ++++++---
 4 files changed, 15 insertions(+), 3 deletions(-)

diff --git a/conf/SystemDS-config.xml.template b/conf/SystemDS-config.xml.template
index b799d5d7ce..6e51cb047f 100644
--- a/conf/SystemDS-config.xml.template
+++ b/conf/SystemDS-config.xml.template
@@ -130,6 +130,9 @@
     <!-- set the degree of parallelism of the federated worker instructions (<=0 means number of virtual cores) -->
     <sysds.federated.par_inst>0</sysds.federated.par_inst>
 
+    <!-- enables the federated read cache for multi-tenancy / cross-session reuse -->
+    <sysds.federated.readcache>true</sysds.federated.readcache>
+
     <!-- set buffer pool threshold (max size) in % of total heap -->
     <sysds.caching.bufferpoollimit>15</sysds.caching.bufferpoollimit>
 
diff --git a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
index 936b6eabda..9e379bc09a 100644
--- a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
+++ b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
@@ -226,6 +226,10 @@ public class ConfigurationManager
 	public static boolean isFederatedSSL(){
 		return getDMLConfig().getBooleanValue(DMLConfig.USE_SSL_FEDERATED_COMMUNICATION);
 	}
+	
+	public static boolean isFederatedReadCacheEnabled(){
+		return getDMLConfig().getBooleanValue(DMLConfig.FEDERATED_READCACHE);
+	}
 
 	///////////////////////////////////////
 	// Thread-local classes
diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java b/src/main/java/org/apache/sysds/conf/DMLConfig.java
index 5364299415..dd2d3a15ba 100644
--- a/src/main/java/org/apache/sysds/conf/DMLConfig.java
+++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java
@@ -121,6 +121,7 @@ public class DMLConfig
 	public static final String FEDERATED_PLANNER = "sysds.federated.planner";
 	public static final String FEDERATED_PAR_INST = "sysds.federated.par_inst";
 	public static final String FEDERATED_PAR_CONN = "sysds.federated.par_conn";
+	public static final String FEDERATED_READCACHE = "sysds.federated.readcache";
 	public static final String PRIVACY_CONSTRAINT_MOCK = "sysds.federated.priv_mock";
 	public static final int DEFAULT_FEDERATED_PORT = 4040; // borrowed default Spark Port
 	public static final int DEFAULT_NUMBER_OF_FEDERATED_WORKER_THREADS = 8;
@@ -192,6 +193,7 @@ public class DMLConfig
 		_defaultVals.put(FEDERATED_PLANNER,      FederatedPlanner.RUNTIME.name());
 		_defaultVals.put(FEDERATED_PAR_CONN,     "-1"); // vcores
 		_defaultVals.put(FEDERATED_PAR_INST,     "-1"); // vcores
+		_defaultVals.put(FEDERATED_READCACHE,    "true"); // vcores
 		_defaultVals.put(PRIVACY_CONSTRAINT_MOCK, null);
 	}
 	
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
index 4ddf23a1d3..574245da20 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
@@ -358,16 +358,19 @@ public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter {
 		final String sId = String.valueOf(id);
 
 		boolean linReuse = (!ReuseCacheType.isNone() && dataType == DataType.MATRIX);
+		boolean readCache = ConfigurationManager.isFederatedReadCacheEnabled();
 		if(!linReuse || !LineageCache.reuseFedRead(sId, dataType, linItem, ec)) {
 			// Lookup read cache if reuse is disabled and we skipped storing in the
 			// lineage cache due to other constraints
-			cd = _frc.get(filename, !linReuse);
+			cd = _frc.get(filename, readCache & !linReuse);
 			try {
 				if(cd == null) { // data is neither in lineage cache nor in read cache
-					cd = localBlock == null ? readDataNoReuse(filename, dataType, mc) : ExecutionContext.createCacheableData(localBlock); // actual read of the data
+					cd = localBlock == null ?
+						readDataNoReuse(filename, dataType, mc) :
+						ExecutionContext.createCacheableData(localBlock); // actual read of the data
 					if(linReuse) // put the object into the lineage cache
 						LineageCache.putFedReadObject(cd, linItem, ec);
-					else
+					else if( readCache )
 						_frc.setData(filename, cd); // set the data into the read cache entry
 				}
 				ec.setVariable(sId, cd);