You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2022/03/20 20:35:10 UTC

[systemds] branch main updated: [MINOR] Extended DML config for federated worker parallelism

This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 4ee2ef2  [MINOR] Extended DML config for federated worker parallelism
4ee2ef2 is described below

commit 4ee2ef2b20095eca35dd5af8acee4d747a17c32e
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Sun Mar 20 21:34:14 2022 +0100

    [MINOR] Extended DML config for federated worker parallelism
    
    sysds.federated.par_conn (number of concurrent connections in even loop)
    sysds.federated.par_inst (instruction parallelism worker instructions)
    
    For both, if the value is <=0, we use by default the number of virtual
    cores as reported by the JVM.
---
 conf/SystemDS-config.xml.template                                  | 6 ++++++
 src/main/java/org/apache/sysds/conf/DMLConfig.java                 | 4 ++++
 .../java/org/apache/sysds/runtime/compress/colgroup/ASDCZero.java  | 2 ++
 .../sysds/runtime/controlprogram/federated/FederatedWorker.java    | 4 +++-
 .../runtime/controlprogram/federated/FederatedWorkerHandler.java   | 7 +++++--
 5 files changed, 20 insertions(+), 3 deletions(-)

diff --git a/conf/SystemDS-config.xml.template b/conf/SystemDS-config.xml.template
index 6893f63..88658b4 100644
--- a/conf/SystemDS-config.xml.template
+++ b/conf/SystemDS-config.xml.template
@@ -118,6 +118,12 @@
     <!-- set the federated plan generator (none, [runtime], compile_fed_all, compile_fed_heuristic, compile_cost_based) -->
     <sysds.federated.planner>runtime</sysds.federated.planner>
 
+    <!-- set the degree of parallelism of the federated worker event loop (<=0 means number of virtual cores) -->
+    <sysds.federated.par_conn>0</sysds.federated.par_conn>
+
+    <!-- set the degree of parallelism of the federated worker instructions (<=0 means number of virtual cores) -->
+    <sysds.federated.par_inst>0</sysds.federated.par_inst>
+
     <!-- set buffer pool threshold (max size) in % of total heap -->
     <sysds.caching.bufferpoollimit>15</sysds.caching.bufferpoollimit>
 
diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java b/src/main/java/org/apache/sysds/conf/DMLConfig.java
index 2511749..a870644 100644
--- a/src/main/java/org/apache/sysds/conf/DMLConfig.java
+++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java
@@ -116,6 +116,8 @@ public class DMLConfig
 	public static final String DEFAULT_FEDERATED_INITIALIZATION_TIMEOUT = "sysds.federated.initialization.timeout"; // int seconds
 	public static final String FEDERATED_TIMEOUT = "sysds.federated.timeout"; // single request timeout default -1 to indicate infinite.
 	public static final String FEDERATED_PLANNER = "sysds.federated.planner";
+	public static final String FEDERATED_PAR_INST = "sysds.federated.par_inst";
+	public static final String FEDERATED_PAR_CONN = "sysds.federated.par_conn";
 	public static final int DEFAULT_FEDERATED_PORT = 4040; // borrowed default Spark Port
 	public static final int DEFAULT_NUMBER_OF_FEDERATED_WORKER_THREADS = 2;
 	
@@ -181,6 +183,8 @@ public class DMLConfig
 		_defaultVals.put(DEFAULT_FEDERATED_INITIALIZATION_TIMEOUT, "10");
 		_defaultVals.put(FEDERATED_TIMEOUT,      "-1");
 		_defaultVals.put(FEDERATED_PLANNER,      FederatedPlanner.RUNTIME.name());
+		_defaultVals.put(FEDERATED_PAR_CONN,     "-1"); // vcores
+		_defaultVals.put(FEDERATED_PAR_INST,     "-1"); // vcores
 	}
 	
 	public DMLConfig() {
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ASDCZero.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ASDCZero.java
index 7a63ca4..2abe9f2 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ASDCZero.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ASDCZero.java
@@ -28,6 +28,8 @@ import org.apache.sysds.runtime.data.SparseBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 
 public abstract class ASDCZero extends APreAgg {
+	private static final long serialVersionUID = -69266306137398807L;
+	
 	/** Sparse row indexes for the data */
 	protected AOffset _indexes;
 
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
index bef4032..038fb68 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
@@ -44,6 +44,7 @@ import org.apache.sysds.api.DMLScript;
 import org.apache.log4j.Logger;
 import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.conf.DMLConfig;
+import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysds.runtime.lineage.LineageCacheConfig;
 
 public class FederatedWorker {
@@ -67,7 +68,8 @@ public class FederatedWorker {
 
 	public void run() throws CertificateException, SSLException {
 		log.info("Setting up Federated Worker on port " + _port);
-		final int EVENT_LOOP_THREADS = Math.max(4, Runtime.getRuntime().availableProcessors() * 4);
+		int par_conn = ConfigurationManager.getDMLConfig().getIntValue(DMLConfig.FEDERATED_PAR_CONN);
+		final int EVENT_LOOP_THREADS = (par_conn > 0) ? par_conn : InfrastructureAnalyzer.getLocalParallelism();
 		NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
 		ThreadPoolExecutor workerTPE = new ThreadPoolExecutor(1, Integer.MAX_VALUE,
 			10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(true));
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
index 1cbfb5a..9dfd30c 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
@@ -37,6 +37,7 @@ import org.apache.sysds.common.Types;
 import org.apache.sysds.common.Types.DataType;
 import org.apache.sysds.common.Types.FileFormat;
 import org.apache.sysds.conf.ConfigurationManager;
+import org.apache.sysds.conf.DMLConfig;
 import org.apache.sysds.parser.DataExpression;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.BasicProgramBlock;
@@ -48,6 +49,7 @@ import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.RequestType;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse.ResponseType;
+import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysds.runtime.instructions.Instruction;
 import org.apache.sysds.runtime.instructions.Instruction.IType;
 import org.apache.sysds.runtime.instructions.InstructionParser;
@@ -436,8 +438,9 @@ public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter {
 
 		// set the number of threads according to the number of processors on the federated worker
 		if(receivedInstruction.getOperator() instanceof MultiThreadedOperator) {
-			int numProcessors = Runtime.getRuntime().availableProcessors();
-			((MultiThreadedOperator)receivedInstruction.getOperator()).setNumThreads(numProcessors);
+			int par_inst = ConfigurationManager.getDMLConfig().getIntValue(DMLConfig.FEDERATED_PAR_INST);
+			((MultiThreadedOperator)receivedInstruction.getOperator())
+				.setNumThreads((par_inst > 0) ? par_inst : InfrastructureAnalyzer.getLocalParallelism());
 		}
 
 		BasicProgramBlock pb = new BasicProgramBlock(null);