You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2021/12/16 22:32:46 UTC

[systemds] branch main updated: [SYSTEMDS-3185] Multi-tenant federated workers (variable isolation)

This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 5cc5239  [SYSTEMDS-3185] Multi-tenant federated workers (variable isolation)
5cc5239 is described below

commit 5cc523971854cdf4f22e6199987a86e213fae4e2
Author: ywcb00 <yw...@ywcb.org>
AuthorDate: Thu Dec 16 23:32:22 2021 +0100

    [SYSTEMDS-3185] Multi-tenant federated workers (variable isolation)
    
    Closes #1421.
---
 .../federated/ExecutionContextMap.java             |  11 +
 .../federated/FederatedLocalData.java              |   9 +-
 .../federated/FederatedLookupTable.java            | 132 +++++++
 .../controlprogram/federated/FederatedRequest.java |   7 +
 .../controlprogram/federated/FederatedWorker.java  |   6 +-
 .../federated/FederatedWorkerHandler.java          |  91 +++--
 .../controlprogram/parfor/util/IDHandler.java      |  69 ++--
 src/test/config/SystemDS-MultiTenant-config.xml    |  23 ++
 .../org/apache/sysds/test/AutomatedTestBase.java   |   2 +-
 .../multitenant/FederatedMultiTenantTest.java      | 404 +++++++++++++++++++++
 .../test/functions/lineage/LineageFedReuseAlg.java |  30 +-
 .../multitenant/FederatedMultiTenantTest.dml       |  63 ++++
 12 files changed, 758 insertions(+), 89 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/ExecutionContextMap.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/ExecutionContextMap.java
index ef4f6d6..21b6541 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/ExecutionContextMap.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/ExecutionContextMap.java
@@ -82,4 +82,15 @@ public class ExecutionContextMap {
 		ec2.setAutoCreateVars(true); //w/o createvar inst
 		return ec2;
 	}
+
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder();
+		sb.append(super.toString());
+		sb.append("\nMain EC: ");
+		sb.append(_main.toString());
+		sb.append("ParFor ECs: ");
+		sb.append(_parEc.toString());
+		return sb.toString();
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedLocalData.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedLocalData.java
index 1589dc3..b1a4c6d 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedLocalData.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedLocalData.java
@@ -24,18 +24,21 @@ import java.util.concurrent.Future;
 
 import org.apache.log4j.Logger;
 import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
+import org.apache.sysds.runtime.controlprogram.parfor.util.IDHandler;
 
 public class FederatedLocalData extends FederatedData {
 	protected final static Logger log = Logger.getLogger(FederatedWorkerHandler.class);
 
-	private static final ExecutionContextMap ecm = new ExecutionContextMap();
-	private static final FederatedWorkerHandler fwh = new FederatedWorkerHandler(ecm);
+	private static final FederatedLookupTable _flt = new FederatedLookupTable();
+	private static final FederatedWorkerHandler _fwh = new FederatedWorkerHandler(_flt);
 
 	private final CacheableData<?> _data;
 
 	public FederatedLocalData(long id, CacheableData<?> data) {
 		super(data.getDataType(), null, data.getFileName());
 		_data = data;
+		long pid = Long.valueOf(IDHandler.obtainProcessID());
+		ExecutionContextMap ecm = _flt.getECM(FederatedLookupTable.NOHOST, pid);
 		synchronized(ecm) {
 			ecm.get(-1).setVariable(Long.toString(id), _data);
 		}
@@ -54,6 +57,6 @@ public class FederatedLocalData extends FederatedData {
 
 	@Override
 	public synchronized Future<FederatedResponse> executeFederatedOperation(FederatedRequest... request) {
-		return CompletableFuture.completedFuture(fwh.createResponse(request));
+		return CompletableFuture.completedFuture(_fwh.createResponse(request));
 	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedLookupTable.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedLookupTable.java
new file mode 100644
index 0000000..905ab57
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedLookupTable.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.controlprogram.federated;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Lookup table mapping from a FedUniqueCoordID (funCID) to an
+ * ExecutionContextMap (ECM) so that every coordinator can address federated
+ * variables with its own local sequential variable IDs. Therefore, the IDs
+ * among different coordinators do not have to be distinct, as every
+ * coordinator works with a seperate ECM at the FederatedWorker.
+ */
+public class FederatedLookupTable {
+	// the NOHOST constant is needed for creating FederatedLocalData where there
+	// is no actual network connection (and hence no host either)
+	public static final String NOHOST = "nohost";
+
+	protected static Logger log = Logger.getLogger(FederatedLookupTable.class);
+
+	// stores the mapping between the funCID and the corresponding ExecutionContextMap
+	private final Map<FedUniqueCoordID, ExecutionContextMap> _lookup_table;
+
+	public FederatedLookupTable() {
+		_lookup_table = new ConcurrentHashMap<>();
+	}
+
+	/**
+	 * Get the ExecutionContextMap corresponding to the given host and pid of the
+	 * requesting coordinator from the lookup table. Create a new
+	 * ExecutionContextMap if there is no corresponding entry in the lookup table.
+	 *
+	 * @param host the host string of the requesting coordinator (usually IP address)
+	 * @param pid the process id of the requesting coordinator
+	 * @return ExecutionContextMap the ECM corresponding to the requesting coordinator
+	 */
+	public ExecutionContextMap getECM(String host, long pid) {
+		log.trace("Getting the ExecutionContextMap for coordinator " + pid + "@" + host);
+		FedUniqueCoordID funCID = new FedUniqueCoordID(host, pid);
+		ExecutionContextMap ecm = _lookup_table.computeIfAbsent(funCID,
+			k -> new ExecutionContextMap());
+		if(ecm == null) {
+			log.error("Computing federated execution context map failed. "
+				+ "No valid resolution for " + funCID.toString() + " found.");
+			throw new FederatedWorkerHandlerException("Computing federated execution context map failed. "
+				+ "No valid resolution for " + funCID.toString() + " found.");
+		}
+		return ecm;
+	}
+
+	/**
+	 * Check if there is a mapped ExecutionContextMap for the coordinator
+	 * with the given host and pid.
+	 *
+	 * @param host the host string of the requesting coordinator (usually IP address)
+	 * @param pid the process id of the requesting coordinator
+	 * @return boolean true if there is a lookup table entry, otherwise false
+	 */
+	public boolean containsFunCID(String host, long pid) {
+		FedUniqueCoordID funCID = new FedUniqueCoordID(host, pid);
+		return _lookup_table.containsKey(funCID);
+	}
+
+	@Override
+	public String toString() {
+		return _lookup_table.toString();
+	}
+
+
+	/**
+	 * Class to collect the information needed to identify a specific coordinator.
+	 */
+	private static class FedUniqueCoordID {
+		private final String _host;
+		private final long _pid;
+
+		public FedUniqueCoordID(String host, long pid) {
+			_host = host;
+			_pid = pid;
+		}
+
+		@Override
+		public final boolean equals(Object obj) {
+			if(this == obj)
+				return true;
+			if(obj == null)
+				return false;
+			if(!(obj instanceof FedUniqueCoordID))
+				return false;
+
+			FedUniqueCoordID funCID = (FedUniqueCoordID) obj;
+
+			return Objects.equals(_host, funCID._host)
+				&& (_pid == funCID._pid);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(_host, _pid);
+		}
+
+		@Override
+		public String toString() {
+			StringBuilder sb = new StringBuilder();
+			sb.append(_pid);
+			sb.append("@");
+			sb.append(_host);
+			return sb.toString();
+		}
+	}
+}
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRequest.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRequest.java
index fd98456..ad9a711 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRequest.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRequest.java
@@ -33,6 +33,7 @@ import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
 import org.apache.sysds.runtime.controlprogram.caching.CacheDataOutput;
 import org.apache.sysds.runtime.controlprogram.caching.LazyWriteBuffer;
+import org.apache.sysds.runtime.controlprogram.parfor.util.IDHandler;
 import org.apache.sysds.runtime.instructions.cp.ScalarObject;
 import org.apache.sysds.utils.Statistics;
 
@@ -56,6 +57,7 @@ public class FederatedRequest implements Serializable {
 	private List<Object> _data;
 	private boolean _checkPrivacy;
 	private List<Long> _checksums;
+	private long _pid;
 
 	public FederatedRequest(RequestType method) {
 		this(method, FederationUtils.getNextFedDataID(), new ArrayList<>());
@@ -74,6 +76,7 @@ public class FederatedRequest implements Serializable {
 		_method = method;
 		_id = id;
 		_data = data;
+		_pid = Long.valueOf(IDHandler.obtainProcessID());
 		setCheckPrivacy();
 		if (DMLScript.LINEAGE && method == RequestType.PUT_VAR)
 			setChecksum();
@@ -95,6 +98,10 @@ public class FederatedRequest implements Serializable {
 		_tid = tid;
 	}
 
+	public long getPID() {
+		return _pid;
+	}
+
 	public Object getParam(int i) {
 		return _data.get(i);
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
index 05414b4..d324ee6 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
@@ -46,10 +46,10 @@ public class FederatedWorker {
 	protected static Logger log = Logger.getLogger(FederatedWorker.class);
 
 	private int _port;
-	private final ExecutionContextMap _ecm;
+	private final FederatedLookupTable _flt;
 
 	public FederatedWorker(int port) {
-		_ecm = new ExecutionContextMap();
+		_flt = new FederatedLookupTable();
 		_port = (port == -1) ? DMLConfig.DEFAULT_FEDERATED_PORT : port;
 	}
 
@@ -77,7 +77,7 @@ public class FederatedWorker {
 							new ObjectDecoder(Integer.MAX_VALUE,
 								ClassResolvers.weakCachingResolver(ClassLoader.getSystemClassLoader())));
 						cp.addLast("ObjectEncoder", new ObjectEncoder());
-						cp.addLast("FederatedWorkerHandler", new FederatedWorkerHandler(_ecm));
+						cp.addLast("FederatedWorkerHandler", new FederatedWorkerHandler(_flt));
 					}
 				}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
 			log.info("Starting Federated Worker server at port: " + _port);
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
index 1d0eb79..28f2932 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
@@ -21,6 +21,8 @@ package org.apache.sysds.runtime.controlprogram.federated;
 
 import java.io.BufferedReader;
 import java.io.InputStreamReader;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.util.Arrays;
 
 import io.netty.channel.ChannelFuture;
@@ -73,7 +75,7 @@ import org.apache.sysds.utils.Statistics;
 public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter {
 	private static final Logger LOG = Logger.getLogger(FederatedWorkerHandler.class);
 
-	private final ExecutionContextMap _ecm;
+	private final FederatedLookupTable _flt;
 
 	/**
 	 * Create a Federated Worker Handler.
@@ -81,24 +83,46 @@ public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter {
 	 * Note: federated worker handler created for every command; and concurrent parfor threads at coordinator need
 	 * separate execution contexts at the federated sites too
 	 * 
-	 * @param ecm A execution context, used to map variables and execution.
+	 * @param flt The Federated Lookup Table of the current Federated Worker.
 	 */
-	public FederatedWorkerHandler(ExecutionContextMap ecm) {
-		_ecm = ecm;
+	public FederatedWorkerHandler(FederatedLookupTable flt) {
+		_flt = flt;
 	}
 
 	@Override
 	public void channelRead(ChannelHandlerContext ctx, Object msg) {
-		ctx.writeAndFlush(createResponse(msg)).addListener(new CloseListener());
+		ctx.writeAndFlush(createResponse(msg, ctx.channel().remoteAddress()))
+			.addListener(new CloseListener());
 	}
 
 	protected FederatedResponse createResponse(Object msg) {
+		return createResponse(msg, FederatedLookupTable.NOHOST);
+	}
+
+	private FederatedResponse createResponse(Object msg, SocketAddress remoteAddress) {
+		String host;
+		if(remoteAddress instanceof InetSocketAddress) {
+			host = ((InetSocketAddress) remoteAddress).getHostString();
+		}
+		else if(remoteAddress instanceof SocketAddress) {
+			host = remoteAddress.toString().split(":")[0].split("/")[1];
+		}
+		else {
+			LOG.warn("Given remote address of coordinator is null. Continuing with "
+				+ FederatedLookupTable.NOHOST + " as host identifier.");
+			host = FederatedLookupTable.NOHOST;
+		}
+
+		return createResponse(msg, host);
+	}
+
+	private FederatedResponse createResponse(Object msg, String remoteHost) {
 		if(!(msg instanceof FederatedRequest[]))
 			return new FederatedResponse(ResponseType.ERROR,
 				new FederatedWorkerHandlerException("Received object of wrong instance 'FederatedRequest[]'."));
 		final FederatedRequest[] requests = (FederatedRequest[]) msg;
 		try {
-			return createResponse(requests);
+			return createResponse(requests, remoteHost);
 		}
 		catch(DMLPrivacyException | FederatedWorkerHandlerException ex) {
 			// Here we control the error message, therefore it is allowed to send the stack trace with the response
@@ -112,20 +136,21 @@ public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter {
 		}
 	}
 
-	private FederatedResponse createResponse(FederatedRequest[] requests)
+	private FederatedResponse createResponse(FederatedRequest[] requests, String remoteHost)
 		throws DMLPrivacyException, FederatedWorkerHandlerException, Exception {
 		FederatedResponse response = null; // last response
 		boolean containsCLEAR = false;
 		for(int i = 0; i < requests.length; i++) {
 			final FederatedRequest request = requests[i];
 			final RequestType t = request.getType();
+			ExecutionContextMap ecm = _flt.getECM(remoteHost, request.getPID());
 			logRequests(request, i, requests.length);
 
 			PrivacyMonitor.setCheckPrivacy(request.checkPrivacy());
 			PrivacyMonitor.clearCheckedConstraints();
 
 			// execute command and handle privacy constraints
-			final FederatedResponse tmp = executeCommand(request);
+			final FederatedResponse tmp = executeCommand(request, ecm);
 			conditionalAddCheckedConstraints(request, tmp);
 
 			// select the response
@@ -176,22 +201,22 @@ public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter {
 			response.setCheckedConstraints(PrivacyMonitor.getCheckedConstraints());
 	}
 
-	private FederatedResponse executeCommand(FederatedRequest request)
+	private FederatedResponse executeCommand(FederatedRequest request, ExecutionContextMap ecm)
 		throws DMLPrivacyException, FederatedWorkerHandlerException, Exception {
 		final RequestType method = request.getType();
 		switch(method) {
 			case READ_VAR:
-				return readData(request); // matrix/frame
+				return readData(request, ecm); // matrix/frame
 			case PUT_VAR:
-				return putVariable(request);
+				return putVariable(request, ecm);
 			case GET_VAR:
-				return getVariable(request);
+				return getVariable(request, ecm);
 			case EXEC_INST:
-				return execInstruction(request);
+				return execInstruction(request, ecm);
 			case EXEC_UDF:
-				return execUDF(request);
+				return execUDF(request, ecm);
 			case CLEAR:
-				return execClear();
+				return execClear(ecm);
 			case NOOP:
 				return execNoop();
 			default:
@@ -200,14 +225,15 @@ public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter {
 		}
 	}
 
-	private FederatedResponse readData(FederatedRequest request) {
+	private FederatedResponse readData(FederatedRequest request, ExecutionContextMap ecm) {
 		checkNumParams(request.getNumParams(), 2);
 		String filename = (String) request.getParam(0);
 		DataType dt = DataType.valueOf((String) request.getParam(1));
-		return readData(filename, dt, request.getID(), request.getTID());
+		return readData(filename, dt, request.getID(), request.getTID(), ecm);
 	}
 
-	private FederatedResponse readData(String filename, Types.DataType dataType, long id, long tid) {
+	private FederatedResponse readData(String filename, Types.DataType dataType,
+		long id, long tid, ExecutionContextMap ecm) {
 		MatrixCharacteristics mc = new MatrixCharacteristics();
 		mc.setBlocksize(ConfigurationManager.getBlocksize());
 		CacheableData<?> cd;
@@ -260,11 +286,11 @@ public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter {
 		if(fmt == FileFormat.CSV)
 			cd.setFileFormatProperties(new FileFormatPropertiesCSV(header, delim, DataExpression.DEFAULT_DELIM_SPARSE));
 		cd.enableCleanup(false); // guard against deletion
-		_ecm.get(tid).setVariable(String.valueOf(id), cd);
+		ecm.get(tid).setVariable(String.valueOf(id), cd);
 
 		if(DMLScript.LINEAGE)
 			// create a literal type lineage item with the file name
-			_ecm.get(tid).getLineage().set(String.valueOf(id), new LineageItem(filename));
+			ecm.get(tid).getLineage().set(String.valueOf(id), new LineageItem(filename));
 
 		if(dataType == Types.DataType.FRAME) {
 			FrameObject frameObject = (FrameObject) cd;
@@ -276,10 +302,10 @@ public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter {
 		return new FederatedResponse(ResponseType.SUCCESS, new Object[] {id, mc});
 	}
 
-	private FederatedResponse putVariable(FederatedRequest request) {
+	private FederatedResponse putVariable(FederatedRequest request, ExecutionContextMap ecm) {
 		checkNumParams(request.getNumParams(), 1, 2);
 		final String varName = String.valueOf(request.getID());
-		ExecutionContext ec = _ecm.get(request.getTID());
+		ExecutionContext ec = ecm.get(request.getTID());
 
 		if(ec.containsVariable(varName)) {
 			Data tgtData = ec.removeVariable(varName);
@@ -312,9 +338,9 @@ public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter {
 		return new FederatedResponse(ResponseType.SUCCESS_EMPTY);
 	}
 
-	private FederatedResponse getVariable(FederatedRequest request) {
+	private FederatedResponse getVariable(FederatedRequest request, ExecutionContextMap ecm) {
 		checkNumParams(request.getNumParams(), 0);
-		ExecutionContext ec = _ecm.get(request.getTID());
+		ExecutionContext ec = ecm.get(request.getTID());
 		if(!ec.containsVariable(String.valueOf(request.getID())))
 			throw new FederatedWorkerHandlerException(
 				"Variable " + request.getID() + " does not exist at federated worker.");
@@ -336,17 +362,18 @@ public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter {
 		}
 	}
 
-	private FederatedResponse execInstruction(FederatedRequest request) throws Exception {
-		ExecutionContext ec = _ecm.get(request.getTID());
+	private FederatedResponse execInstruction(FederatedRequest request, ExecutionContextMap ecm) throws Exception {
+		ExecutionContext ec = ecm.get(request.getTID());
 		
 		//handle missing spark execution context
 		//TODO handling of spark instructions should be under control of federated site not coordinator
 		Instruction receivedInstruction = InstructionParser.parseSingleInstruction((String) request.getParam(0));
 		if(receivedInstruction.getType() == IType.SPARK
 			&& !(ec instanceof SparkExecutionContext) ) {
-			_ecm.convertToSparkCtx();
-			ec = _ecm.get(request.getTID());
+			ecm.convertToSparkCtx();
+			ec = ecm.get(request.getTID());
 		}
+
 		BasicProgramBlock pb = new BasicProgramBlock(null);
 		pb.getInstructions().clear();
 		pb.getInstructions().add(receivedInstruction);
@@ -363,9 +390,9 @@ public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter {
 		return new FederatedResponse(ResponseType.SUCCESS_EMPTY);
 	}
 
-	private FederatedResponse execUDF(FederatedRequest request) {
+	private FederatedResponse execUDF(FederatedRequest request, ExecutionContextMap ecm) {
 		checkNumParams(request.getNumParams(), 1);
-		ExecutionContext ec = _ecm.get(request.getTID());
+		ExecutionContext ec = ecm.get(request.getTID());
 
 		// get function and input parameters
 		FederatedUDF udf = (FederatedUDF) request.getParam(0);
@@ -401,9 +428,9 @@ public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter {
 		}
 	}
 
-	private FederatedResponse execClear() {
+	private FederatedResponse execClear(ExecutionContextMap ecm) {
 		try {
-			_ecm.clear();
+			ecm.clear();
 		}
 		catch(DMLPrivacyException | FederatedWorkerHandlerException ex) {
 			throw ex;
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/util/IDHandler.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/util/IDHandler.java
index 0f9b7ef..f863562 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/util/IDHandler.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/util/IDHandler.java
@@ -33,24 +33,21 @@ import java.net.InetAddress;
  */
 public class IDHandler 
 {
-	public static int extractIntID( String taskID )
-	{
+	public static int extractIntID( String taskID ) {
 		int maxlen = (int)(Math.log10(Integer.MAX_VALUE));
 		int intVal = (int)extractID( taskID, maxlen );
-		return intVal;		
-		
+		return intVal;
 	}
 
-	public static long concatIntIDsToLong( int part1, int part2 )
-	{
+	public static long concatIntIDsToLong( int part1, int part2 ) {
 		//big-endian version (in java uses only big endian)
 		long value = ((long)part1) << 32; //unsigned shift of part1 to first 4bytes
 		value = value | part2;            //bitwise OR with part2 (second 4bytes)
-		
+
 		//*-endian version 
 		//long value = ((long)part1)*(long)Math.pow(2, 32);
 		//value += part2;
-		
+
 		return value;
 	}
 
@@ -61,56 +58,59 @@ public class IDHandler
 	 * @param part if part is 1, use first 4 bytes. if part is 2, use second 4 bytes!
 	 * @return return int id, or -1 if part is not 1 or 2!
 	 */
-	public static int extractIntIDFromLong( long val, int part )
-	{
+	public static int extractIntIDFromLong( long val, int part ) {
 		int ret = -1;
 		if( part == 1 )
 			ret = (int)(val >>> 32);
 		else if( part == 2 )
 			ret = (int)val; 
-				
+
 		return ret;
 	}
-	
+
 	/**
 	 * Creates a unique identifier with the pattern &lt;process_id&gt;_&lt;host_ip&gt;.
 	 * 
 	 * @return distributed unique id
 	 */
-	public static String createDistributedUniqueID() 
-	{
+	public static String createDistributedUniqueID() {
 		String uuid = null;
-		
-		try
-		{
-			//get process id		 
-		    String pname = ManagementFactory.getRuntimeMXBean().getName(); //pid@hostname
-		    String pid = pname.split("@")[0];
-		    
-		    //get ip address
-		    InetAddress addr = InetAddress.getLocalHost();
-		    String host = addr.getHostAddress();
-		    	
-		    uuid = pid + "_" + host;
+
+		try {
+			String pid = obtainProcessID();
+
+			//get ip address
+			InetAddress addr = InetAddress.getLocalHost();
+			String host = addr.getHostAddress();
+
+			uuid = pid + "_" + host;
 		}
-		catch(Exception ex)
-		{
+		catch(Exception ex) {
 			uuid = "0_0.0.0.0";
 		}
-		
+
 		return uuid;
 	}
 
-	private static long extractID( String taskID, int maxlen )
-	{
+	public static String obtainProcessID() {
+		//get process id
+		String pname = ManagementFactory.getRuntimeMXBean().getName(); //pid@hostname
+		String pid = pname.split("@")[0];
+		// TODO: change this as soon as we switch to a java version >= 9
+		// import java.lang.ProcessHandle;
+		// pid = ProcessHandle.current().pid();
+		return pid;
+	}
+
+	private static long extractID( String taskID, int maxlen ) {
 		//in: e.g., task_local_0002_m_000009 or task_201203111647_0898_m_000001
 		//out: e.g., 2000009
-		
+
 		//generic parsing for flexible taskID formats
 		char[] c = taskID.toCharArray(); //all chars
 		long value = 0; //1 catch leading zeros as well		
 		int count = 0;
-		
+
 		for( int i=c.length-1; i >= 0 && count<maxlen; i-- ) //start at end
 		{
 			if( c[i] >= 48 && c[i]<=57 )  //'0'-'9'
@@ -122,8 +122,7 @@ public class IDHandler
 				count++;
 			}
 		}
-		
+
 		return value;
 	}
-	
 }
diff --git a/src/test/config/SystemDS-MultiTenant-config.xml b/src/test/config/SystemDS-MultiTenant-config.xml
new file mode 100644
index 0000000..3e250cc
--- /dev/null
+++ b/src/test/config/SystemDS-MultiTenant-config.xml
@@ -0,0 +1,23 @@
+<!--
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+-->
+
+<root>
+   <!-- The timeout of the federated tests to initialize the federated matrixes -->
+   <sysds.federated.initialization.timeout>30</sysds.federated.initialization.timeout>
+</root>
diff --git a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
index 41ba0f5..0243f2a 100644
--- a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
+++ b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
@@ -140,7 +140,7 @@ public abstract class AutomatedTestBase {
 	private static final String DEBUG_TEMP_DIR = "./tmp/";
 
 	/** Directory under which config files shared across tests are located. */
-	private static final String CONFIG_DIR = "./src/test/config/";
+	protected static final String CONFIG_DIR = "./src/test/config/";
 
 	/**
 	 * Location of the SystemDS config file that we use as a template when generating the configs for each test case.
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/multitenant/FederatedMultiTenantTest.java b/src/test/java/org/apache/sysds/test/functions/federated/multitenant/FederatedMultiTenantTest.java
new file mode 100644
index 0000000..eade66c
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/federated/multitenant/FederatedMultiTenantTest.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.federated.multitenant;
+
+import java.io.IOException;
+import java.lang.Math;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+
+import static org.junit.Assert.fail;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types.ExecMode;
+import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
+@net.jcip.annotations.NotThreadSafe
+public class FederatedMultiTenantTest extends AutomatedTestBase {
+	private final static String TEST_NAME = "FederatedMultiTenantTest";
+
+	private final static String TEST_DIR = "functions/federated/multitenant/";
+	private static final String TEST_CLASS_DIR = TEST_DIR + FederatedMultiTenantTest.class.getSimpleName() + "/";
+
+	private final static double TOLERANCE = 0;
+
+	private final static int blocksize = 1024;
+	@Parameterized.Parameter()
+	public int rows;
+	@Parameterized.Parameter(1)
+	public int cols;
+	@Parameterized.Parameter(2)
+	public boolean rowPartitioned;
+
+	@Parameterized.Parameters
+	public static Collection<Object[]> data() {
+		return Arrays.asList(
+			new Object[][] {
+				{100, 1000, false},
+				// {1000, 100, true},
+		});
+	}
+
+	private ArrayList<Process> workerProcesses = new ArrayList<>();
+	private ArrayList<Process> coordinatorProcesses = new ArrayList<>();
+
+	private enum OpType {
+		SUM,
+		PARFOR_SUM,
+		WSIGMOID,
+	}
+
+	@Override
+	public void setUp() {
+		TestUtils.clearAssertionInformation();
+		addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"S"}));
+	}
+
+	@Test
+	public void testSumSameWorkersCP() {
+		runMultiTenantSameWorkerTest(OpType.SUM, 4, ExecMode.SINGLE_NODE);
+	}
+
+	@Test
+	@Ignore
+	public void testSumSharedWorkersCP() {
+		runMultiTenantSharedWorkerTest(OpType.SUM, 3, 9, ExecMode.SINGLE_NODE);
+	}
+
+	@Test
+	@Ignore
+	public void testSumSameWorkersSP() {
+		runMultiTenantSameWorkerTest(OpType.SUM, 4, ExecMode.SPARK);
+	}
+
+//FIXME still runs into blocking
+//	@Test
+//	public void testSumSharedWorkersSP() {
+//		runMultiTenantSharedWorkerTest(OpType.SUM, 3, 9, ExecMode.SPARK);
+//	}
+
+	@Test
+	@Ignore
+	public void testParforSumSameWorkersCP() {
+		runMultiTenantSameWorkerTest(OpType.PARFOR_SUM, 4, ExecMode.SINGLE_NODE);
+	}
+
+	@Test
+	public void testParforSumSharedWorkersCP() {
+		runMultiTenantSharedWorkerTest(OpType.PARFOR_SUM, 3, 9, ExecMode.SINGLE_NODE);
+	}
+
+	@Test
+	public void testParforSumSameWorkersSP() {
+		runMultiTenantSameWorkerTest(OpType.PARFOR_SUM, 4, ExecMode.SPARK);
+	}
+
+	@Test
+	@Ignore
+	public void testParforSumSharedWorkersSP() {
+		runMultiTenantSharedWorkerTest(OpType.PARFOR_SUM, 3, 9, ExecMode.SPARK);
+	}
+
+	@Test
+	public void testWSigmoidSameWorkersCP() {
+		runMultiTenantSameWorkerTest(OpType.WSIGMOID, 4, ExecMode.SINGLE_NODE);
+	}
+
+	@Test
+	@Ignore
+	public void testWSigmoidSharedWorkersCP() {
+		runMultiTenantSharedWorkerTest(OpType.WSIGMOID, 3, 9, ExecMode.SINGLE_NODE);
+	}
+
+	@Test
+	@Ignore
+	public void testWSigmoidSameWorkersSP() {
+		runMultiTenantSameWorkerTest(OpType.WSIGMOID, 4, ExecMode.SPARK);
+	}
+
+	@Test
+	public void testWSigmoidSharedWorkersSP() {
+		runMultiTenantSharedWorkerTest(OpType.WSIGMOID, 3, 9, ExecMode.SPARK);
+	}
+
+	// ensure that the processes are killed - even if the test throws an exception
+	@After
+	public void stopAllProcesses() {
+		for(Process p : coordinatorProcesses)
+			p.destroyForcibly();
+		for(Process p : workerProcesses)
+			p.destroyForcibly();
+	}
+
+	private void runMultiTenantSameWorkerTest(OpType opType, int numCoordinators, ExecMode execMode) {
+		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+		ExecMode platformOld = rtplatform;
+
+		if(rtplatform == ExecMode.SPARK)
+			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+
+		getAndLoadTestConfiguration(TEST_NAME);
+		String HOME = SCRIPT_DIR + TEST_DIR;
+
+		// write input matrices
+		int r = rows;
+		int c = cols / 4;
+		if(rowPartitioned) {
+			r = rows / 4;
+			c = cols;
+		}
+
+		double[][] X1 = getRandomMatrix(r, c, 0, 3, 1, 3);
+		double[][] X2 = getRandomMatrix(r, c, 0, 3, 1, 7);
+		double[][] X3 = getRandomMatrix(r, c, 0, 3, 1, 8);
+		double[][] X4 = getRandomMatrix(r, c, 0, 3, 1, 9);
+
+		MatrixCharacteristics mc = new MatrixCharacteristics(r, c, blocksize, r * c);
+		writeInputMatrixWithMTD("X1", X1, false, mc);
+		writeInputMatrixWithMTD("X2", X2, false, mc);
+		writeInputMatrixWithMTD("X3", X3, false, mc);
+		writeInputMatrixWithMTD("X4", X4, false, mc);
+
+		// empty script name because we don't execute any script, just start the worker
+		fullDMLScriptName = "";
+
+		int[] workerPorts = startFedWorkers(4);
+
+		rtplatform = execMode;
+		if(rtplatform == ExecMode.SPARK) {
+			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+		}
+		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
+		loadTestConfiguration(config);
+
+		// start the coordinator processes
+		String scriptName = HOME + TEST_NAME + ".dml";
+		programArgs = new String[] {"-stats", "100", "-fedStats", "100", "-nvargs",
+			"in_X1=" + TestUtils.federatedAddress(workerPorts[0], input("X1")),
+			"in_X2=" + TestUtils.federatedAddress(workerPorts[1], input("X2")),
+			"in_X3=" + TestUtils.federatedAddress(workerPorts[2], input("X3")),
+			"in_X4=" + TestUtils.federatedAddress(workerPorts[3], input("X4")),
+			"rows=" + rows, "cols=" + cols, "testnum=" + Integer.toString(opType.ordinal()),
+			"rP=" + Boolean.toString(rowPartitioned).toUpperCase()};
+		for(int counter = 0; counter < numCoordinators; counter++)
+			coordinatorProcesses.add(startCoordinator(execMode, scriptName,
+				ArrayUtils.addAll(programArgs, "out_S=" + output("S" + counter))));
+
+		joinCoordinatorsAndVerify(opType, execMode);
+
+		// check that federated input files are still existing
+		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
+		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X2")));
+		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X3")));
+		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X4")));
+
+		TestUtils.shutdownThreads(workerProcesses.toArray(new Process[0]));
+
+		rtplatform = platformOld;
+		DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+	}
+
+	private void runMultiTenantSharedWorkerTest(OpType opType, int numCoordinators, int maxNumWorkers, ExecMode execMode) {
+		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+		ExecMode platformOld = rtplatform;
+
+		if(rtplatform == ExecMode.SPARK)
+			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+
+		getAndLoadTestConfiguration(TEST_NAME);
+		String HOME = SCRIPT_DIR + TEST_DIR;
+
+		final int numPartitions = 4;
+		final int numSharedWorkers = numPartitions - (int)Math.floor(maxNumWorkers / numCoordinators);
+		final int numFedWorkers = (numCoordinators * (numPartitions - numSharedWorkers)) + numSharedWorkers;
+
+		// write input matrices
+		int r = rows;
+		int c = cols / 4;
+		if(rowPartitioned) {
+			r = rows / 4;
+			c = cols;
+		}
+
+		double[][] X1 = getRandomMatrix(r, c, 0, 3, 1, 3);
+		double[][] X2 = getRandomMatrix(r, c, 0, 3, 1, 7);
+		double[][] X3 = getRandomMatrix(r, c, 0, 3, 1, 8);
+		double[][] X4 = getRandomMatrix(r, c, 0, 3, 1, 9);
+
+		MatrixCharacteristics mc = new MatrixCharacteristics(r, c, blocksize, r * c);
+		writeInputMatrixWithMTD("X1", X1, false, mc);
+		writeInputMatrixWithMTD("X2", X2, false, mc);
+		writeInputMatrixWithMTD("X3", X3, false, mc);
+		writeInputMatrixWithMTD("X4", X4, false, mc);
+
+		// empty script name because we don't execute any script, just start the worker
+		fullDMLScriptName = "";
+
+		int[] workerPorts = startFedWorkers(numFedWorkers);
+
+		rtplatform = execMode;
+		if(rtplatform == ExecMode.SPARK) {
+			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+		}
+		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
+		loadTestConfiguration(config);
+
+		// start the coordinator processes
+		final String scriptName = HOME + TEST_NAME + ".dml";
+		for(int counter = 0; counter < numCoordinators; counter++) {
+			int workerIndexOffset = (numPartitions - numSharedWorkers) * counter;
+			programArgs = new String[] {"-config", CONFIG_DIR + "SystemDS-MultiTenant-config.xml",
+				"-stats", "100", "-fedStats", "100", "-nvargs",
+				"in_X1=" + TestUtils.federatedAddress(workerPorts[workerIndexOffset], input("X1")),
+				"in_X2=" + TestUtils.federatedAddress(workerPorts[workerIndexOffset + 1], input("X2")),
+				"in_X3=" + TestUtils.federatedAddress(workerPorts[workerIndexOffset + 2], input("X3")),
+				"in_X4=" + TestUtils.federatedAddress(workerPorts[workerIndexOffset + 3], input("X4")),
+				"rows=" + rows, "cols=" + cols, "testnum=" + Integer.toString(opType.ordinal()),
+				"rP=" + Boolean.toString(rowPartitioned).toUpperCase(), "out_S=" + output("S" + counter)};
+			coordinatorProcesses.add(startCoordinator(execMode, scriptName, programArgs));
+		}
+
+		joinCoordinatorsAndVerify(opType, execMode);
+
+		// check that federated input files are still existing
+		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
+		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X2")));
+		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X3")));
+		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X4")));
+
+		TestUtils.shutdownThreads(workerProcesses.toArray(new Process[0]));
+
+		rtplatform = platformOld;
+		DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+	}
+
+	private int[] startFedWorkers(int numFedWorkers) {
+		int[] ports = new int[numFedWorkers];
+		for(int counter = 0; counter < numFedWorkers; counter++) {
+			ports[counter] = getRandomAvailablePort();
+			@SuppressWarnings("deprecation")
+			Process tmpProcess = startLocalFedWorker(ports[counter]);
+			workerProcesses.add(tmpProcess);
+		}
+		return ports;
+	}
+
+	private Process startCoordinator(ExecMode execMode, String scriptPath, String[] args) {
+		String separator = System.getProperty("file.separator");
+		String classpath = System.getProperty("java.class.path");
+		String path = System.getProperty("java.home") + separator + "bin" + separator + "java";
+
+		String em = null;
+		switch(execMode) {
+			case SINGLE_NODE:
+			em = "singlenode";
+			break;
+			case HYBRID:
+			em = "hybrid";
+			break;
+			case SPARK:
+			em = "spark";
+			break;
+		}
+
+		ArrayList<String> argsList = new ArrayList<>();
+		argsList.add("-f");
+		argsList.add(scriptPath);
+		argsList.add("-exec");
+		argsList.add(em);
+		argsList.addAll(Arrays.asList(args));
+
+		ProcessBuilder processBuilder = new ProcessBuilder(ArrayUtils.addAll(new String[]{
+			path, "-cp", classpath, DMLScript.class.getName()}, argsList.toArray(new String[0])))
+			.redirectErrorStream(true);
+		
+		Process process = null;
+		try {
+			process = processBuilder.start();
+		} catch(IOException ioe) {
+			ioe.printStackTrace();
+		}
+		
+		return process;
+	}
+
+	private void joinCoordinatorsAndVerify(OpType opType, ExecMode execMode) {
+		// join the coordinator processes
+		for(int counter = 0; counter < coordinatorProcesses.size(); counter++) {
+			Process coord = coordinatorProcesses.get(counter);
+			
+			//wait for process, but obtain logs before to avoid blocking
+			String outputLog = null, errorLog = null;
+			try {
+				outputLog = IOUtils.toString(coord.getInputStream());
+				errorLog = IOUtils.toString(coord.getErrorStream());
+				
+				coord.waitFor();
+			}
+			catch(Exception ex) {
+				ex.printStackTrace();
+			}
+			
+			// get and print the output
+			System.out.println("Output of coordinator #" + Integer.toString(counter + 1) + ":\n");
+			System.out.println(outputLog);
+			System.out.println(errorLog);
+			Assert.assertTrue(checkForHeavyHitter(opType, outputLog, execMode));
+		}
+
+		// compare the results via files
+		HashMap<CellIndex, Double> refResults = readDMLMatrixFromOutputDir("S" + 0);
+		if(refResults.isEmpty())
+			fail("The result of the first coordinator, which is taken as reference, is empty.");
+		for(int counter = 1; counter < coordinatorProcesses.size(); counter++) {
+			HashMap<CellIndex, Double> fedResults = readDMLMatrixFromOutputDir("S" + counter);
+			TestUtils.compareMatrices(fedResults, refResults, TOLERANCE, "Fed" + counter, "FedRef");
+		}
+	}
+
+	private static boolean checkForHeavyHitter(OpType opType, String outputLog, ExecMode execMode) {
+		switch(opType) {
+			case SUM:
+				return outputLog.contains("fed_uak+");
+			case PARFOR_SUM:
+				return outputLog.contains(execMode == ExecMode.SPARK ? "fed_rblk" : "fed_uak+");
+			case WSIGMOID:
+				return outputLog.contains("fed_wsigmoid");
+			default:
+				return false;
+		}
+	}
+}
diff --git a/src/test/java/org/apache/sysds/test/functions/lineage/LineageFedReuseAlg.java b/src/test/java/org/apache/sysds/test/functions/lineage/LineageFedReuseAlg.java
index 257848d..e38a4fd 100644
--- a/src/test/java/org/apache/sysds/test/functions/lineage/LineageFedReuseAlg.java
+++ b/src/test/java/org/apache/sysds/test/functions/lineage/LineageFedReuseAlg.java
@@ -98,23 +98,9 @@ public class LineageFedReuseAlg extends AutomatedTestBase {
 			TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
 			loadTestConfiguration(config);
 
-			// Run with federated matrix and without reuse
-			fullDMLScriptName = HOME + TEST_NAME + ".dml";
-			programArgs = new String[] {"-stats", "20", 
-				"-nvargs", "in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
-				"in_X2=" + TestUtils.federatedAddress(port2, input("X2")),
-				"in_X3=" + TestUtils.federatedAddress(port3, input("X3")),
-				"in_X4=" + TestUtils.federatedAddress(port4, input("X4")), "rows=" + rows, "cols=" + (cols + 1),
-				"in_Y=" + input("Y"), "cont=" + String.valueOf(contSplits).toUpperCase(), "out=" + expected("Z")};
-			runTest(true, false, null, -1);
-			long tsmmCount = Statistics.getCPHeavyHitterCount("tsmm");
-			long fed_tsmmCount = Statistics.getCPHeavyHitterCount("fed_tsmm");
-			long mmCount = Statistics.getCPHeavyHitterCount("ba+*");
-			long fed_mmCount = Statistics.getCPHeavyHitterCount("fed_ba+*");
-
 			// Run with federated matrix and with reuse
 			fullDMLScriptName = HOME + TEST_NAME + ".dml";
-			programArgs = new String[] {"-stats", "20", "-lineage", "reuse_full", 
+			programArgs = new String[] {"-stats", "20", "-lineage", "reuse_full",
 				"-nvargs", "in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
 				"in_X2=" + TestUtils.federatedAddress(port2, input("X2")),
 				"in_X3=" + TestUtils.federatedAddress(port3, input("X3")),
@@ -127,6 +113,20 @@ public class LineageFedReuseAlg extends AutomatedTestBase {
 			long mmCount_reuse = Statistics.getCPHeavyHitterCount("ba+*");
 			long fed_mmCount_reuse = Statistics.getCPHeavyHitterCount("fed_ba+*");
 
+			// Run with federated matrix and without reuse
+			fullDMLScriptName = HOME + TEST_NAME + ".dml";
+			programArgs = new String[] {"-stats", "20",
+				"-nvargs", "in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
+				"in_X2=" + TestUtils.federatedAddress(port2, input("X2")),
+				"in_X3=" + TestUtils.federatedAddress(port3, input("X3")),
+				"in_X4=" + TestUtils.federatedAddress(port4, input("X4")), "rows=" + rows, "cols=" + (cols + 1),
+				"in_Y=" + input("Y"), "cont=" + String.valueOf(contSplits).toUpperCase(), "out=" + expected("Z")};
+			runTest(true, false, null, -1);
+			long tsmmCount = Statistics.getCPHeavyHitterCount("tsmm");
+			long fed_tsmmCount = Statistics.getCPHeavyHitterCount("fed_tsmm");
+			long mmCount = Statistics.getCPHeavyHitterCount("ba+*");
+			long fed_mmCount = Statistics.getCPHeavyHitterCount("fed_ba+*");
+
 			// compare results 
 			compareResults(1e-2);
 			// compare potentially reused instruction counts
diff --git a/src/test/scripts/functions/federated/multitenant/FederatedMultiTenantTest.dml b/src/test/scripts/functions/federated/multitenant/FederatedMultiTenantTest.dml
new file mode 100644
index 0000000..e0bef2e
--- /dev/null
+++ b/src/test/scripts/functions/federated/multitenant/FederatedMultiTenantTest.dml
@@ -0,0 +1,63 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if ($rP) {
+    X = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+        ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+    		list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+} else {
+    X = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+            ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2),
+            	list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
+}
+
+testnum = $testnum;
+
+if(testnum == 0) { # SUM
+  S = as.matrix(sum(X));
+}
+else if(testnum == 1) { # PARFOR_SUM
+  numiter = 5;
+  Z = matrix(0, rows=numiter, cols=1);
+  parfor( i in 1:numiter ) {
+    while(FALSE) { }
+    Y = X + i;
+    while(FALSE) { }
+    Z[i, 1] = sum(Y);
+  }
+  S = as.matrix(0);
+  for( i in 1:numiter ) {
+    while(FALSE) { }
+    S = S + Z[i, 1];
+  }
+}
+else if(testnum == 2) { # WSIGMOID
+  N = nrow(X);
+  M = ncol(X);
+
+  U = rand(rows=N, cols=15, seed=123);
+  V = rand(rows=M, cols=15, seed=456);
+
+  UV = U %*% t(V);
+  S = X * log(1 / (1 + exp(-UV)));
+}
+
+write(S, $out_S);