You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ba...@apache.org on 2020/08/24 11:30:44 UTC

[systemds] branch master updated: [SYSTEMDS-2638] Federated stats for Worker

This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new 68b51d3  [SYSTEMDS-2638] Federated stats for Worker
68b51d3 is described below

commit 68b51d3d668f9e5bbc34f6a0537d10b9b069fe3e
Author: baunsgaard <ba...@tugraz.at>
AuthorDate: Mon Aug 24 11:04:09 2020 +0200

    [SYSTEMDS-2638] Federated stats for Worker
    
    Federated workers now are able to output stats of execution, if the -stats
    flag is set. The stats are printed at every clear command sendt to the
    worker. Each time the stats are printed the stats are also reset.
    
    This commit also change the test setup to worker now in separated JVM in
    test frame work. To enable this the federated worker startup wait time
    have been increased to 1 sec from 0.75 sec
---
 .github/workflows/functionsTests.yml               |  3 +-
 .../federated/FederatedWorkerHandler.java          | 21 ++++---
 .../java/org/apache/sysds/utils/Statistics.java    |  5 ++
 .../org/apache/sysds/test/AutomatedTestBase.java   | 59 ++++++++++---------
 src/test/java/org/apache/sysds/test/TestUtils.java | 18 ++++++
 .../{ => algorithms}/FederatedBivarTest.java       | 10 ++--
 .../{ => algorithms}/FederatedGLMTest.java         |  6 +-
 .../{ => algorithms}/FederatedKmeansTest.java      |  6 +-
 .../{ => algorithms}/FederatedL2SVMTest.java       |  4 +-
 .../{ => algorithms}/FederatedLogRegTest.java      | 66 ++++++++++++++--------
 .../{ => algorithms}/FederatedPCATest.java         | 10 ++--
 .../{ => algorithms}/FederatedUnivarTest.java      | 10 ++--
 .../{ => algorithms}/FederatedYL2SVMTest.java      |  4 +-
 .../FederatedBinaryMatrixTest.java                 |  6 +-
 .../FederatedBinaryVectorTest.java                 |  6 +-
 .../FederatedConstructionTest.java                 |  4 +-
 .../FederatedMatrixScalarOperationsTest.java       |  2 +-
 .../{ => primitives}/FederatedMultiplyTest.java    |  6 +-
 .../{ => primitives}/FederatedNegativeTest.java    |  2 +-
 .../{ => primitives}/FederatedRCBindTest.java      |  4 +-
 .../{ => primitives}/FederatedSumTest.java         |  4 +-
 .../test/functions/privacy/FederatedL2SVMTest.java |  2 +-
 .../privacy/FederatedWorkerHandlerTest.java        |  8 +--
 .../TransformFederatedEncodeDecodeTest.java        |  2 +-
 24 files changed, 158 insertions(+), 110 deletions(-)

diff --git a/.github/workflows/functionsTests.yml b/.github/workflows/functionsTests.yml
index 63fd85e..7d0349a 100644
--- a/.github/workflows/functionsTests.yml
+++ b/.github/workflows/functionsTests.yml
@@ -54,7 +54,8 @@ jobs:
           data.rand,
           data.tensor,
           dnn,
-          federated,
+          federated.algorithms,
+          federated.primitives,
           frame,
           indexing,
           io,
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
index 0dcb846..b09b75d 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
@@ -19,14 +19,14 @@
 
 package org.apache.sysds.runtime.controlprogram.federated;
 
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.Arrays;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
+import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types;
 import org.apache.sysds.common.Types.DataType;
 import org.apache.sysds.common.Types.FileFormat;
@@ -52,11 +52,13 @@ import org.apache.sysds.runtime.privacy.DMLPrivacyException;
 import org.apache.sysds.runtime.privacy.PrivacyMonitor;
 import org.apache.sysds.runtime.privacy.PrivacyPropagator;
 import org.apache.sysds.utils.JSONHelper;
+import org.apache.sysds.utils.Statistics;
 import org.apache.wink.json4j.JSONObject;
 
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.util.Arrays;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
 
 public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter {
 	protected static Logger log = Logger.getLogger(FederatedWorkerHandler.class);
@@ -110,6 +112,11 @@ public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter {
 			else if( response == null && i == requests.length-1 ) {
 				response = tmp; //return last
 			}
+
+			if (DMLScript.STATISTICS && request.getType() == RequestType.CLEAR){
+				System.out.println("Federated Worker " + Statistics.display());
+				Statistics.reset();
+			}
 		}
 		ctx.writeAndFlush(response).addListener(new CloseListener());
 	}
diff --git a/src/main/java/org/apache/sysds/utils/Statistics.java b/src/main/java/org/apache/sysds/utils/Statistics.java
index b498b0e..8f22ab4 100644
--- a/src/main/java/org/apache/sysds/utils/Statistics.java
+++ b/src/main/java/org/apache/sysds/utils/Statistics.java
@@ -501,6 +501,11 @@ public class Statistics
 		nativeConv2dBwdFilterTime = 0;
 		nativeConv2dBwdDataTime = 0;
 		LibMatrixDNN.resetStatistics();
+		federatedReadCount.reset();
+		federatedPutCount.reset();
+		federatedGetCount.reset();
+		federatedExecuteInstructionCount.reset();
+		federatedExecuteUDFCount.reset();
 	}
 
 	public static void resetJITCompileTime(){
diff --git a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
index bfb2e5d..7bca262 100644
--- a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
+++ b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
@@ -19,6 +19,7 @@
 
 package org.apache.sysds.test;
 
+import static java.lang.Thread.sleep;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
@@ -100,7 +101,7 @@ public abstract class AutomatedTestBase {
 	public static final boolean TEST_GPU = false;
 	public static final double GPU_TOLERANCE = 1e-9;
 
-	public static final int FED_WORKER_WAIT = 750; // in ms
+	public static final int FED_WORKER_WAIT = 1000; // in ms
 
 	// With OpenJDK 8u242 on Windows, the new changes in JDK are not allowing
 	// to set the native library paths internally thus breaking the code.
@@ -1283,38 +1284,40 @@ public abstract class AutomatedTestBase {
 		}
 	}
 
-	protected Thread startLocalFedWorker(int port) {
-		Thread t = null;
-		String[] fedWorkArgs = {"-w", Integer.toString(port)};
-		ArrayList<String> args = new ArrayList<>();
-
-		addProgramIndependentArguments(args);
-
-		for(int i = 0; i < fedWorkArgs.length; i++)
-			args.add(fedWorkArgs[i]);
-
-		String[] finalArguments = args.toArray(new String[args.size()]);
-
-		try {
-			t = new Thread(() -> {
-				try {
-					main(finalArguments);
-				}
-				catch(IOException e) {
-				}
-			});
-			t.start();
-			java.util.concurrent.TimeUnit.MILLISECONDS.sleep(FED_WORKER_WAIT);
-		}
-		catch(InterruptedException e) {
-			// Should happen at closing of the worker so don't print
+	/**
+	 * Start new JVM for a federated worker at the port.
+	 * 
+	 * 
+	 * @param port Port to use for the JVM
+	 * @return the process associated with the worker.
+	 */
+	protected Process startLocalFedWorker(int port) {
+		Process process = null;
+		String separator = System.getProperty("file.separator");
+		String classpath = System.getProperty("java.class.path");
+		String path = System.getProperty("java.home")
+					+ separator + "bin" + separator + "java";
+		ProcessBuilder processBuilder = new ProcessBuilder(path, "-cp", 
+				classpath, DMLScript.class.getName(), "-w",  Integer.toString(port), "-stats");
+
+		try{
+			process = processBuilder.start();
+			// Give some time to startup the worker.
+			sleep(FED_WORKER_WAIT);
+		} catch (IOException | InterruptedException e){
+			e.printStackTrace();
 		}
-		return t;
+		return process;
 	}
 
+	/**
+	 * Start java worker in same JVM.
+	 * @param args the command line arguments
+	 * @return the thread associated with the process.s
+	 */
 	public static Thread startLocalFedWorkerWithArgs(String[] args) {
 		Thread t = null;
-
+		
 		try {
 			t = new Thread(() -> {
 				try {
diff --git a/src/test/java/org/apache/sysds/test/TestUtils.java b/src/test/java/org/apache/sysds/test/TestUtils.java
index d5aea1c..1e38e2c 100644
--- a/src/test/java/org/apache/sysds/test/TestUtils.java
+++ b/src/test/java/org/apache/sysds/test/TestUtils.java
@@ -2469,6 +2469,11 @@ public class TestUtils
 		for( Thread t : ts )
 			shutdownThread(t);
 	}
+
+	public static void shutdownThreads(Process... ts) {
+		for( Process t : ts )
+			shutdownThread(t);
+	}
 	
 	public static void shutdownThread(Thread t) {
 		// kill the worker
@@ -2482,6 +2487,19 @@ public class TestUtils
 			}
 		}
 	}
+
+	public static void shutdownThread(Process t) {
+		// kill the worker
+		if( t != null ) {
+			Process d = t.destroyForcibly();
+			try {
+				d.waitFor();
+			}
+			catch (InterruptedException e) {
+				e.printStackTrace();
+			}
+		}
+	}
 	
 	public static String federatedAddress(int port, String input) {
 		return federatedAddress("localhost", port, input);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedBivarTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedBivarTest.java
similarity index 96%
rename from src/test/java/org/apache/sysds/test/functions/federated/FederatedBivarTest.java
rename to src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedBivarTest.java
index 906beec..96fef5d 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedBivarTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedBivarTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sysds.test.functions.federated;
+package org.apache.sysds.test.functions.federated.algorithms;
 
 import org.apache.sysds.common.Types;
 import org.apache.sysds.runtime.meta.MatrixCharacteristics;
@@ -116,10 +116,10 @@ public class FederatedBivarTest extends AutomatedTestBase {
 		int port2 = getRandomAvailablePort();
 		int port3 = getRandomAvailablePort();
 		int port4 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorker(port1);
-		Thread t2 = startLocalFedWorker(port2);
-		Thread t3 = startLocalFedWorker(port3);
-		Thread t4 = startLocalFedWorker(port4);
+		Process t1 = startLocalFedWorker(port1);
+		Process t2 = startLocalFedWorker(port2);
+		Process t3 = startLocalFedWorker(port3);
+		Process t4 = startLocalFedWorker(port4);
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
 		loadTestConfiguration(config);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedGLMTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedGLMTest.java
similarity index 97%
rename from src/test/java/org/apache/sysds/test/functions/federated/FederatedGLMTest.java
rename to src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedGLMTest.java
index 009d921..636b279 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedGLMTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedGLMTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sysds.test.functions.federated;
+package org.apache.sysds.test.functions.federated.algorithms;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -94,8 +94,8 @@ public class FederatedGLMTest extends AutomatedTestBase {
 		fullDMLScriptName = "";
 		int port1 = getRandomAvailablePort();
 		int port2 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorker(port1);
-		Thread t2 = startLocalFedWorker(port2);
+		Process t1 = startLocalFedWorker(port1);
+		Process t2 = startLocalFedWorker(port2);
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
 		loadTestConfiguration(config);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedKmeansTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedKmeansTest.java
similarity index 97%
rename from src/test/java/org/apache/sysds/test/functions/federated/FederatedKmeansTest.java
rename to src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedKmeansTest.java
index 7b60476..933e971 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedKmeansTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedKmeansTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sysds.test.functions.federated;
+package org.apache.sysds.test.functions.federated.algorithms;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -101,8 +101,8 @@ public class FederatedKmeansTest extends AutomatedTestBase {
 		fullDMLScriptName = "";
 		int port1 = getRandomAvailablePort();
 		int port2 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorker(port1);
-		Thread t2 = startLocalFedWorker(port2);
+		Process t1 = startLocalFedWorker(port1);
+		Process t2 = startLocalFedWorker(port2);
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
 		loadTestConfiguration(config);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedL2SVMTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedL2SVMTest.java
similarity index 98%
rename from src/test/java/org/apache/sysds/test/functions/federated/FederatedL2SVMTest.java
rename to src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedL2SVMTest.java
index 4cfc70e..4caf52e 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedL2SVMTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedL2SVMTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sysds.test.functions.federated;
+package org.apache.sysds.test.functions.federated.algorithms;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -76,7 +76,7 @@ public class FederatedL2SVMTest extends AutomatedTestBase {
 		if(rtplatform == Types.ExecMode.SPARK) {
 			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
 		}
-		Thread t1, t2;
+		Process t1, t2;
 
 		getAndLoadTestConfiguration(TEST_NAME);
 		String HOME = SCRIPT_DIR + TEST_DIR;
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedLogRegTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLogRegTest.java
similarity index 76%
rename from src/test/java/org/apache/sysds/test/functions/federated/FederatedLogRegTest.java
rename to src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLogRegTest.java
index 1c07bfc..fef8889 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedLogRegTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLogRegTest.java
@@ -17,12 +17,13 @@
  * under the License.
  */
 
-package org.apache.sysds.test.functions.federated;
+package org.apache.sysds.test.functions.federated.algorithms;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.Collection;
 
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 import org.apache.sysds.common.Types;
 import org.apache.sysds.common.Types.ExecMode;
 import org.apache.sysds.runtime.meta.MatrixCharacteristics;
@@ -30,9 +31,10 @@ import org.apache.sysds.runtime.util.HDFSTool;
 import org.apache.sysds.test.AutomatedTestBase;
 import org.apache.sysds.test.TestConfiguration;
 import org.apache.sysds.test.TestUtils;
-
-import java.util.Arrays;
-import java.util.Collection;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 @RunWith(value = Parameterized.class)
 @net.jcip.annotations.NotThreadSafe
@@ -61,17 +63,16 @@ public class FederatedLogRegTest extends AutomatedTestBase {
 	}
 
 	@Test
-	public void federatedSinglenodeGLM() {
-		federatedGLM(Types.ExecMode.SINGLE_NODE);
+	public void federatedSinglenodeLogReg() {
+		federatedLogReg(Types.ExecMode.SINGLE_NODE);
 	}
 
 	@Test
-	public void federatedHybridGLM() {
-		federatedGLM(Types.ExecMode.HYBRID);
+	public void federatedHybridLogReg() {
+		federatedLogReg(Types.ExecMode.HYBRID);
 	}
 
-	
-	public void federatedGLM(Types.ExecMode execMode) {
+	public void federatedLogReg(Types.ExecMode execMode) {
 		ExecMode platformOld = setExecMode(execMode);
 
 		getAndLoadTestConfiguration(TEST_NAME);
@@ -94,13 +95,25 @@ public class FederatedLogRegTest extends AutomatedTestBase {
 		fullDMLScriptName = "";
 		int port1 = getRandomAvailablePort();
 		int port2 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorker(port1);
-		Thread t2 = startLocalFedWorker(port2);
+		Process t1 = startLocalFedWorker(port1);
+		Process t2 = startLocalFedWorker(port2);
+
+		BufferedReader output = new BufferedReader(new InputStreamReader(t1.getInputStream()));
+		BufferedReader error = new BufferedReader(new InputStreamReader(t1.getInputStream()));
+
+		Thread t = new Thread(() -> {
+			output.lines().forEach(s -> System.out.println(s));
+		});
+		Thread te = new Thread(() -> {
+			error.lines().forEach(s -> System.err.println(s));
+		});
+		t.start();
+		te.start();
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
 		loadTestConfiguration(config);
 		setOutputBuffering(false);
-		
+
 		// Run reference dml script with normal matrix
 		fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
 		programArgs = new String[] {"-args", input("X1"), input("X2"), input("Y"), expected("Z")};
@@ -108,8 +121,8 @@ public class FederatedLogRegTest extends AutomatedTestBase {
 
 		// Run actual dml script with federated matrix
 		fullDMLScriptName = HOME + TEST_NAME + ".dml";
-		programArgs = new String[] {"-stats", "30",
-			"-nvargs", "in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
+		programArgs = new String[] {"-stats", "30", "-nvargs",
+			"in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
 			"in_X2=" + TestUtils.federatedAddress(port2, input("X2")), "rows=" + rows, "cols=" + cols,
 			"in_Y=" + input("Y"), "out=" + output("Z")};
 		runTest(true, false, null, -1);
@@ -118,16 +131,19 @@ public class FederatedLogRegTest extends AutomatedTestBase {
 		compareResults(1e-9);
 
 		TestUtils.shutdownThreads(t1, t2);
+		TestUtils.shutdownThreads(t, te);
 
 		// check for federated operations
-		Assert.assertTrue("contains federated matrix mult",heavyHittersContainsString("fed_ba+*"));
-		Assert.assertTrue("contains federated row unary aggregate",heavyHittersContainsString("fed_uark+","fed_uarsqk+"));
-		Assert.assertTrue("contains federated matrix mult chain or transpose",heavyHittersContainsString("fed_mmchain", "fed_r'"));
-		
-		//check that federated input files are still existing
+		Assert.assertTrue("contains federated matrix mult", heavyHittersContainsString("fed_ba+*"));
+		Assert.assertTrue("contains federated row unary aggregate",
+			heavyHittersContainsString("fed_uark+", "fed_uarsqk+"));
+		Assert.assertTrue("contains federated matrix mult chain or transpose",
+			heavyHittersContainsString("fed_mmchain", "fed_r'"));
+
+		// check that federated input files are still existing
 		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
 		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X2")));
-		
+
 		resetExecMode(platformOld);
 	}
 }
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedPCATest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedPCATest.java
similarity index 96%
rename from src/test/java/org/apache/sysds/test/functions/federated/FederatedPCATest.java
rename to src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedPCATest.java
index 4b4457a..7a5a2fd 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedPCATest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedPCATest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sysds.test.functions.federated;
+package org.apache.sysds.test.functions.federated.algorithms;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -101,10 +101,10 @@ public class FederatedPCATest extends AutomatedTestBase {
 		int port2 = getRandomAvailablePort();
 		int port3 = getRandomAvailablePort();
 		int port4 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorker(port1);
-		Thread t2 = startLocalFedWorker(port2);
-		Thread t3 = startLocalFedWorker(port3);
-		Thread t4 = startLocalFedWorker(port4);
+		Process t1 = startLocalFedWorker(port1);
+		Process t2 = startLocalFedWorker(port2);
+		Process t3 = startLocalFedWorker(port3);
+		Process t4 = startLocalFedWorker(port4);
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
 		loadTestConfiguration(config);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedUnivarTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedUnivarTest.java
similarity index 95%
rename from src/test/java/org/apache/sysds/test/functions/federated/FederatedUnivarTest.java
rename to src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedUnivarTest.java
index a704d3a..c042cbb 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedUnivarTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedUnivarTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sysds.test.functions.federated;
+package org.apache.sysds.test.functions.federated.algorithms;
 
 import org.apache.sysds.common.Types;
 import org.apache.sysds.runtime.meta.MatrixCharacteristics;
@@ -101,10 +101,10 @@ public class FederatedUnivarTest extends AutomatedTestBase {
 		int port2 = getRandomAvailablePort();
 		int port3 = getRandomAvailablePort();
 		int port4 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorker(port1);
-		Thread t2 = startLocalFedWorker(port2);
-		Thread t3 = startLocalFedWorker(port3);
-		Thread t4 = startLocalFedWorker(port4);
+		Process t1 = startLocalFedWorker(port1);
+		Process t2 = startLocalFedWorker(port2);
+		Process t3 = startLocalFedWorker(port3);
+		Process t4 = startLocalFedWorker(port4);
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
 		loadTestConfiguration(config);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedYL2SVMTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedYL2SVMTest.java
similarity index 98%
rename from src/test/java/org/apache/sysds/test/functions/federated/FederatedYL2SVMTest.java
rename to src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedYL2SVMTest.java
index 27323d8..ebdee88 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedYL2SVMTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedYL2SVMTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sysds.test.functions.federated;
+package org.apache.sysds.test.functions.federated.algorithms;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -78,7 +78,7 @@ public class FederatedYL2SVMTest extends AutomatedTestBase {
 		if(rtplatform == Types.ExecMode.SPARK) {
 			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
 		}
-		Thread t1, t2;
+		Process t1, t2;
 
 		getAndLoadTestConfiguration(TEST_NAME);
 		String HOME = SCRIPT_DIR + TEST_DIR;
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedBinaryMatrixTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryMatrixTest.java
similarity index 97%
rename from src/test/java/org/apache/sysds/test/functions/federated/FederatedBinaryMatrixTest.java
rename to src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryMatrixTest.java
index bb8ae52..052234d 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedBinaryMatrixTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryMatrixTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sysds.test.functions.federated;
+package org.apache.sysds.test.functions.federated.primitives;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -99,8 +99,8 @@ public class FederatedBinaryMatrixTest extends AutomatedTestBase {
 
 		int port1 = getRandomAvailablePort();
 		int port2 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorker(port1);
-		Thread t2 = startLocalFedWorker(port2);
+		Process t1 = startLocalFedWorker(port1);
+		Process t2 = startLocalFedWorker(port2);
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
 		loadTestConfiguration(config);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedBinaryVectorTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryVectorTest.java
similarity index 97%
rename from src/test/java/org/apache/sysds/test/functions/federated/FederatedBinaryVectorTest.java
rename to src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryVectorTest.java
index 0016331..187648b 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedBinaryVectorTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryVectorTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sysds.test.functions.federated;
+package org.apache.sysds.test.functions.federated.primitives;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -100,8 +100,8 @@ public class FederatedBinaryVectorTest extends AutomatedTestBase {
 
 		int port1 = getRandomAvailablePort();
 		int port2 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorker(port1);
-		Thread t2 = startLocalFedWorker(port2);
+		Process t1 = startLocalFedWorker(port1);
+		Process t2 = startLocalFedWorker(port2);
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
 		loadTestConfiguration(config);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedConstructionTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedConstructionTest.java
similarity index 98%
rename from src/test/java/org/apache/sysds/test/functions/federated/FederatedConstructionTest.java
rename to src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedConstructionTest.java
index 8125bfe..74180be 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedConstructionTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedConstructionTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sysds.test.functions.federated;
+package org.apache.sysds.test.functions.federated.primitives;
 
 import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types;
@@ -120,7 +120,7 @@ public class FederatedConstructionTest extends AutomatedTestBase {
 		String HOME = SCRIPT_DIR + TEST_DIR;
 
 		int port = getRandomAvailablePort();
-		Thread t = startLocalFedWorker(port);
+		Process t = startLocalFedWorker(port);
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
 		loadTestConfiguration(config);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedMatrixScalarOperationsTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedMatrixScalarOperationsTest.java
similarity index 99%
rename from src/test/java/org/apache/sysds/test/functions/federated/FederatedMatrixScalarOperationsTest.java
rename to src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedMatrixScalarOperationsTest.java
index f67bd9b..becb246 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedMatrixScalarOperationsTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedMatrixScalarOperationsTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sysds.test.functions.federated;
+package org.apache.sysds.test.functions.federated.primitives;
 
 import org.junit.Test;
 import org.junit.runners.Parameterized;
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedMultiplyTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedMultiplyTest.java
similarity index 96%
rename from src/test/java/org/apache/sysds/test/functions/federated/FederatedMultiplyTest.java
rename to src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedMultiplyTest.java
index 7968dd7..c75b0b5 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedMultiplyTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedMultiplyTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sysds.test.functions.federated;
+package org.apache.sysds.test.functions.federated.primitives;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -96,8 +96,8 @@ public class FederatedMultiplyTest extends AutomatedTestBase {
 
 		int port1 = getRandomAvailablePort();
 		int port2 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorker(port1);
-		Thread t2 = startLocalFedWorker(port2);
+		Process t1 = startLocalFedWorker(port1);
+		Process t2 = startLocalFedWorker(port2);
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
 		loadTestConfiguration(config);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedNegativeTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedNegativeTest.java
similarity index 97%
rename from src/test/java/org/apache/sysds/test/functions/federated/FederatedNegativeTest.java
rename to src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedNegativeTest.java
index 8c60cec..5cac52c 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedNegativeTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedNegativeTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sysds.test.functions.federated;
+package org.apache.sysds.test.functions.federated.primitives;
 
 import org.apache.sysds.common.Types;
 import org.apache.sysds.runtime.controlprogram.federated.*;
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedRCBindTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRCBindTest.java
similarity index 97%
rename from src/test/java/org/apache/sysds/test/functions/federated/FederatedRCBindTest.java
rename to src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRCBindTest.java
index 0f28a7b..e59eea8 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedRCBindTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRCBindTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sysds.test.functions.federated;
+package org.apache.sysds.test.functions.federated.primitives;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -79,7 +79,7 @@ public class FederatedRCBindTest extends AutomatedTestBase {
 		writeInputMatrixWithMTD("A", A, false, new MatrixCharacteristics(rows, cols, blocksize, rows * cols));
 
 		int port = getRandomAvailablePort();
-		Thread t = startLocalFedWorker(port);
+		Process t = startLocalFedWorker(port);
 
 		// we need the reference file to not be written to hdfs, so we get the correct format
 		rtplatform = Types.ExecMode.SINGLE_NODE;
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedSumTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedSumTest.java
similarity index 97%
rename from src/test/java/org/apache/sysds/test/functions/federated/FederatedSumTest.java
rename to src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedSumTest.java
index 69a743c..c6d4be6 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedSumTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedSumTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sysds.test.functions.federated;
+package org.apache.sysds.test.functions.federated.primitives;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -78,7 +78,7 @@ public class FederatedSumTest extends AutomatedTestBase {
 		double[][] A = getRandomMatrix(rows / 2, cols, -10, 10, 1, 1);
 		writeInputMatrixWithMTD("A", A, false, new MatrixCharacteristics(rows / 2, cols, blocksize, (rows / 2) * cols));
 		int port = getRandomAvailablePort();
-		Thread t = startLocalFedWorker(port);
+		Process t = startLocalFedWorker(port);
 
 		// we need the reference file to not be written to hdfs, so we get the correct format
 		rtplatform = Types.ExecMode.SINGLE_NODE;
diff --git a/src/test/java/org/apache/sysds/test/functions/privacy/FederatedL2SVMTest.java b/src/test/java/org/apache/sysds/test/functions/privacy/FederatedL2SVMTest.java
index 6ecb5ba..b3180cc 100644
--- a/src/test/java/org/apache/sysds/test/functions/privacy/FederatedL2SVMTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/privacy/FederatedL2SVMTest.java
@@ -314,7 +314,7 @@ public class FederatedL2SVMTest extends AutomatedTestBase {
 		if(rtplatform == Types.ExecMode.SPARK) {
 			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
 		}
-		Thread t1 = null, t2 = null;
+		Process t1 = null, t2 = null;
 
 		try {
 			getAndLoadTestConfiguration(TEST_NAME);
diff --git a/src/test/java/org/apache/sysds/test/functions/privacy/FederatedWorkerHandlerTest.java b/src/test/java/org/apache/sysds/test/functions/privacy/FederatedWorkerHandlerTest.java
index 3c2cbd6..6c7ce4b 100644
--- a/src/test/java/org/apache/sysds/test/functions/privacy/FederatedWorkerHandlerTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/privacy/FederatedWorkerHandlerTest.java
@@ -154,7 +154,6 @@ public class FederatedWorkerHandlerTest extends AutomatedTestBase {
 		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
 		Types.ExecMode platformOld = rtplatform;
 
-		Thread t;
 
 		getAndLoadTestConfiguration("aggregation");
 		String HOME = SCRIPT_DIR + TEST_DIR;
@@ -162,7 +161,7 @@ public class FederatedWorkerHandlerTest extends AutomatedTestBase {
 		double[][] A = getRandomMatrix(rows, cols, -10, 10, 1, 1);
 		writeInputMatrixWithMTD("A", A, false, new MatrixCharacteristics(rows, cols, blocksize, rows * cols), new PrivacyConstraint(privacyLevel));
 		int port = getRandomAvailablePort();
-		t = startLocalFedWorker(port);
+		Process t = startLocalFedWorker(port);
 
 		// we need the reference file to not be written to hdfs, so we get the correct format
 		rtplatform = Types.ExecMode.SINGLE_NODE;
@@ -224,7 +223,6 @@ public class FederatedWorkerHandlerTest extends AutomatedTestBase {
 		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
 		Types.ExecMode platformOld = rtplatform;
 
-		Thread t;
 
 		getAndLoadTestConfiguration("transfer");
 		String HOME = SCRIPT_DIR + TEST_DIR;
@@ -233,7 +231,7 @@ public class FederatedWorkerHandlerTest extends AutomatedTestBase {
 		writeInputMatrixWithMTD("A", A, false, new MatrixCharacteristics(rows, cols, blocksize, rows * cols), new PrivacyConstraint(privacyLevel));
 
 		int port = getRandomAvailablePort();
-		t = startLocalFedWorker(port);
+		Process t = startLocalFedWorker(port);
 
 		// we need the reference file to not be written to hdfs, so we get the correct format
 		rtplatform = Types.ExecMode.SINGLE_NODE;
@@ -289,7 +287,7 @@ public class FederatedWorkerHandlerTest extends AutomatedTestBase {
 			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
 		}
 
-		Thread t1, t2;
+		Process t1, t2;
 
 		getAndLoadTestConfiguration("matvecmult");
 		String HOME = SCRIPT_DIR + TEST_DIR;
diff --git a/src/test/java/org/apache/sysds/test/functions/transform/TransformFederatedEncodeDecodeTest.java b/src/test/java/org/apache/sysds/test/functions/transform/TransformFederatedEncodeDecodeTest.java
index 1f9c87d..6a5653e 100644
--- a/src/test/java/org/apache/sysds/test/functions/transform/TransformFederatedEncodeDecodeTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/transform/TransformFederatedEncodeDecodeTest.java
@@ -87,7 +87,7 @@ public class TransformFederatedEncodeDecodeTest extends AutomatedTestBase {
 		ExecMode platformOld = rtplatform;
 		rtplatform = ExecMode.SINGLE_NODE;
 
-		Thread t1 = null, t2 = null, t3 = null, t4 = null;
+		Process t1 = null, t2 = null, t3 = null, t4 = null;
 		try {
 			getAndLoadTestConfiguration(TEST_NAME1);