You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2020/08/08 18:52:50 UTC

[systemds] branch master updated: [SYSTEMDS-2600] Rework federated runtime backend (framework, ops)

This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new a4f992e  [SYSTEMDS-2600] Rework federated runtime backend (framework, ops)
a4f992e is described below

commit a4f992ed86d92cff95b160dab0c852b5434bed25
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Sat Aug 8 20:51:25 2020 +0200

    [SYSTEMDS-2600] Rework federated runtime backend (framework, ops)
    
    This patch makes a major rework of the exiting federated runtime backend
    and operations in order to simplify the joint development of all
    remaining federated operations.
    
    The new design has only four command types: read, put, get, exec_inst,
    which allows to read federated matrices, put and get variables, and
    execute arbitrary instructions over these variables. With this approach,
    we can reuse the existing symbol table and CP/Spark instructions and
    only need to handle their orchestration and global compensations.
    Furthermore, the new design adds several primitives like broadcast,
    broadcastSliced, aggregations, and rbind/cbind and more convenient data
    structures.
    
    Finally, this patch also includes minor reworks of the execution
    context, and reblock rewrite to allow for specific characteristics of
    federated execution.
---
 src/main/java/org/apache/sysds/common/Types.java   |   2 +-
 .../hops/rewrite/RewriteBlockSizeAndReblock.java   |   5 +-
 .../controlprogram/caching/CacheableData.java      |  10 +-
 .../controlprogram/caching/FrameObject.java        |   3 +-
 .../controlprogram/caching/MatrixObject.java       |   4 +-
 .../controlprogram/context/ExecutionContext.java   |  56 +++-
 .../context/SparkExecutionContext.java             |   6 +-
 .../controlprogram/federated/FederatedData.java    |  57 +---
 .../controlprogram/federated/FederatedRange.java   |  10 +
 .../controlprogram/federated/FederatedRequest.java |  50 ++-
 .../federated/FederatedResponse.java               |  22 +-
 .../controlprogram/federated/FederatedWorker.java  |  24 +-
 .../federated/FederatedWorkerHandler.java          | 278 ++++++----------
 .../federated/FederatedWorkerHandlerException.java |   4 +
 .../controlprogram/federated/FederationMap.java    | 153 +++++++++
 .../controlprogram/federated/FederationUtils.java  | 125 +++++++
 .../controlprogram/federated/LibFederatedAgg.java  | 103 ------
 .../federated/LibFederatedAppend.java              |  80 -----
 .../cp/MatrixIndexingCPInstruction.java            |   2 +-
 .../instructions/cp/VariableCPInstruction.java     |   7 +
 .../fed/AggregateBinaryFEDInstruction.java         | 359 ++-------------------
 .../fed/AggregateUnaryFEDInstruction.java          |  60 ++--
 .../instructions/fed/AppendFEDInstruction.java     |  72 +++--
 .../fed/BinaryMatrixScalarFEDInstruction.java      |  75 ++---
 .../instructions/fed/InitFEDInstruction.java       |  16 +-
 .../matrix/operators/AggregateUnaryOperator.java   |  10 +
 .../apache/sysds/runtime/util/UtilFunctions.java   |  26 --
 .../org/apache/sysds/test/AutomatedTestBase.java   |   2 +-
 .../federated/FederatedConstructionTest.java       |   4 +-
 .../functions/federated/FederatedMultiplyTest.java |   6 +-
 .../functions/federated/FederatedRCBindTest.java   |   7 +-
 .../test/functions/federated/FederatedSumTest.java |   4 +-
 .../functions/federated/FederatedSumTest.dml       |   1 +
 .../FederatedMatrixAdditionScalar.dml              |   1 +
 34 files changed, 692 insertions(+), 952 deletions(-)

diff --git a/src/main/java/org/apache/sysds/common/Types.java b/src/main/java/org/apache/sysds/common/Types.java
index b3f8de1..92027a5 100644
--- a/src/main/java/org/apache/sysds/common/Types.java
+++ b/src/main/java/org/apache/sysds/common/Types.java
@@ -37,7 +37,7 @@ public class Types
 	/**
 	 * Execution type of individual operations.
 	 */
-	public enum ExecType { CP, CP_FILE, SPARK, GPU, INVALID }
+	public enum ExecType { CP, CP_FILE, SPARK, GPU, FED, INVALID }
 	
 	/**
 	 * Data types (tensor, matrix, scalar, frame, object, unknown).
diff --git a/src/main/java/org/apache/sysds/hops/rewrite/RewriteBlockSizeAndReblock.java b/src/main/java/org/apache/sysds/hops/rewrite/RewriteBlockSizeAndReblock.java
index 1da6aa5..cc24919 100644
--- a/src/main/java/org/apache/sysds/hops/rewrite/RewriteBlockSizeAndReblock.java
+++ b/src/main/java/org/apache/sysds/hops/rewrite/RewriteBlockSizeAndReblock.java
@@ -134,8 +134,9 @@ public class RewriteBlockSizeAndReblock extends HopRewriteRule
 					}
 				}
 				else if (dop.getOp() == OpOpData.FEDERATED) {
-					// TODO maybe do something here?
-				} else {
+					dop.setBlocksize(blocksize);
+				}
+				else {
 					throw new HopsException(hop.printErrorLocation() + "unexpected non-scalar Data HOP in reblock.\n");
 				}
 			}
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
index 602393e..590b3e5 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
@@ -32,8 +32,7 @@ import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.caching.LazyWriteBuffer.RPolicy;
-import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
-import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
+import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
 import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence;
 import org.apache.sysds.runtime.instructions.cp.Data;
@@ -170,8 +169,7 @@ public abstract class CacheableData<T extends CacheBlock> extends Data
 	 */
 	protected PrivacyConstraint _privacyConstraint = null;
 	
-	protected Map<FederatedRange, FederatedData> _fedMapping = null;
-	
+	protected FederationMap _fedMapping = null;
 	
 	/** The name of HDFS file in which the data is backed up. */
 	protected String _hdfsFileName = null; // file name and path
@@ -357,7 +355,7 @@ public abstract class CacheableData<T extends CacheBlock> extends Data
 	 * Gets the mapping of indices ranges to federated objects.
 	 * @return fedMapping mapping
 	 */
-	public Map<FederatedRange, FederatedData> getFedMapping() {
+	public FederationMap getFedMapping() {
 		return _fedMapping;
 	}
 	
@@ -365,7 +363,7 @@ public abstract class CacheableData<T extends CacheBlock> extends Data
 	 * Sets the mapping of indices ranges to federated objects.
 	 * @param fedMapping mapping
 	 */
-	public void setFedMapping(Map<FederatedRange, FederatedData> fedMapping) {
+	public void setFedMapping(FederationMap fedMapping) {
 		_fedMapping = fedMapping;
 	}
 	
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
index 19c33a9..ef6e790 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
@@ -47,7 +47,6 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.Future;
 
-import static org.apache.sysds.runtime.util.UtilFunctions.requestFederatedData;
 
 public class FrameObject extends CacheableData<FrameBlock>
 {
@@ -169,7 +168,7 @@ public class FrameObject extends CacheableData<FrameBlock>
 		FrameBlock result = new FrameBlock(_schema);
 		// provide long support?
 		result.ensureAllocatedColumns((int) _metaData.getDataCharacteristics().getRows());
-		List<Pair<FederatedRange, Future<FederatedResponse>>> readResponses = requestFederatedData(_fedMapping);
+		List<Pair<FederatedRange, Future<FederatedResponse>>> readResponses = _fedMapping.requestFederatedData();
 		try {
 			for(Pair<FederatedRange, Future<FederatedResponse>> readResponse : readResponses) {
 				FederatedRange range = readResponse.getLeft();
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
index 7509c02..6216ba5 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
@@ -19,8 +19,6 @@
 
 package org.apache.sysds.runtime.controlprogram.caching;
 
-import static org.apache.sysds.runtime.util.UtilFunctions.requestFederatedData;
-
 import java.io.IOException;
 import java.lang.ref.SoftReference;
 import java.util.List;
@@ -405,7 +403,7 @@ public class MatrixObject extends CacheableData<MatrixBlock>
 		long[] dims = getDataCharacteristics().getDims();
 		// TODO sparse optimization
 		MatrixBlock result = new MatrixBlock((int) dims[0], (int) dims[1], false);
-		List<Pair<FederatedRange, Future<FederatedResponse>>> readResponses = requestFederatedData(_fedMapping);
+		List<Pair<FederatedRange, Future<FederatedResponse>>> readResponses = _fedMapping.requestFederatedData();
 		try {
 			for (Pair<FederatedRange, Future<FederatedResponse>> readResponse : readResponses) {
 				FederatedRange range = readResponse.getLeft();
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContext.java b/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContext.java
index 3354eb8..7be3bfd 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContext.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContext.java
@@ -22,11 +22,15 @@ package org.apache.sysds.runtime.controlprogram.context;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types;
 import org.apache.sysds.common.Types.FileFormat;
 import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.conf.ConfigurationManager;
+import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.LocalVariableMap;
 import org.apache.sysds.runtime.controlprogram.Program;
+import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
 import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
@@ -66,7 +70,8 @@ public class ExecutionContext {
 	
 	//symbol table
 	protected LocalVariableMap _variables;
-
+	protected boolean _autoCreateVars;
+	
 	//lineage map, cache, prepared dedup blocks
 	protected Lineage _lineage;
 
@@ -83,12 +88,14 @@ public class ExecutionContext {
 	protected ExecutionContext( boolean allocateVariableMap, boolean allocateLineage, Program prog ) {
 		//protected constructor to force use of ExecutionContextFactory
 		_variables = allocateVariableMap ? new LocalVariableMap() : null;
+		_autoCreateVars = false;
 		_lineage = allocateLineage ? new Lineage() : null;
 		_prog = prog;
 	}
 
 	public ExecutionContext(LocalVariableMap vars) {
 		_variables = vars;
+		_autoCreateVars = false;
 		_lineage = null;
 		_prog = null;
 	}
@@ -116,6 +123,14 @@ public class ExecutionContext {
 	public void setLineage(Lineage lineage) {
 		_lineage = lineage;
 	}
+	
+	public boolean isAutoCreateVars() {
+		return _autoCreateVars;
+	}
+	
+	public void setAutoCreateVars(boolean flag) {
+		_autoCreateVars = flag;
+	}
 
 	/**
 	 * Get the i-th GPUContext
@@ -502,6 +517,8 @@ public class ExecutionContext {
 	}
 	
 	public void setMatrixOutput(String varName, MatrixBlock outputData) {
+		if( isAutoCreateVars() && !containsVariable(varName) )
+			setVariable(varName, createMatrixObject(outputData));
 		MatrixObject mo = getMatrixObject(varName);
 		mo.acquireModify(outputData);
 		mo.release();
@@ -509,6 +526,8 @@ public class ExecutionContext {
 	}
 
 	public void setMatrixOutput(String varName, MatrixBlock outputData, UpdateType flag) {
+		if( isAutoCreateVars() && !containsVariable(varName) )
+			setVariable(varName, createMatrixObject(outputData));
 		if( flag.isInPlace() ) {
 			//modify metadata to carry update status
 			MatrixObject mo = getMatrixObject(varName);
@@ -517,10 +536,6 @@ public class ExecutionContext {
 		setMatrixOutput(varName, outputData);
 	}
 
-	public void setMatrixOutput(String varName, MatrixBlock outputData, UpdateType flag, String opcode) {
-		setMatrixOutput(varName, outputData, flag);
-	}
-
 	public void setTensorOutput(String varName, TensorBlock outputData) {
 		TensorObject to = getTensorObject(varName);
 		to.acquireModify(outputData);
@@ -529,11 +544,42 @@ public class ExecutionContext {
 	}
 	
 	public void setFrameOutput(String varName, FrameBlock outputData) {
+		if( isAutoCreateVars() && !containsVariable(varName) )
+			setVariable(varName, createFrameObject(outputData));
 		FrameObject fo = getFrameObject(varName);
 		fo.acquireModify(outputData);
 		fo.release();
 		setVariable(varName, fo);
 	}
+
+	public static CacheableData<?> createCacheableData(CacheBlock cb) {
+		if( cb instanceof MatrixBlock )
+			return createMatrixObject((MatrixBlock) cb);
+		else if( cb instanceof FrameBlock )
+			return createFrameObject((FrameBlock) cb);
+		return null;
+	}
+	
+	private static CacheableData<?> createMatrixObject(MatrixBlock mb) {
+		MatrixObject ret = new MatrixObject(Types.ValueType.FP64, 
+			OptimizerUtils.getUniqueTempFileName());
+		ret.acquireModify(mb);
+		ret.setMetaData(new MetaDataFormat(new MatrixCharacteristics(
+			mb.getNumRows(), mb.getNumColumns()), FileFormat.BINARY));
+		ret.getMetaData().getDataCharacteristics()
+			.setBlocksize(ConfigurationManager.getBlocksize());
+		ret.release();
+		return ret;
+	}
+	
+	private static CacheableData<?> createFrameObject(FrameBlock fb) {
+		FrameObject ret = new FrameObject(OptimizerUtils.getUniqueTempFileName());
+		ret.acquireModify(fb);
+		ret.setMetaData(new MetaDataFormat(new MatrixCharacteristics(
+			fb.getNumRows(), fb.getNumColumns()), FileFormat.BINARY));
+		ret.release();
+		return ret;
+	}
 	
 	public List<MatrixBlock> getMatrixInputs(CPOperand[] inputs) {
 		return getMatrixInputs(inputs, false);
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
index 11a4e93..510113c 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
@@ -381,14 +381,14 @@ public class SparkExecutionContext extends ExecutionContext
 			rdd = mo.getRDDHandle().getRDD();
 		}
 		//CASE 2: dirty in memory data or cached result of rdd operations
-		else if( mo.isDirty() || mo.isCached(false) )
+		else if( mo.isDirty() || mo.isCached(false) || mo.isFederated() )
 		{
 			//get in-memory matrix block and parallelize it
 			//w/ guarded parallelize (fallback to export, rdd from file if too large)
 			DataCharacteristics dc = mo.getDataCharacteristics();
 			boolean fromFile = false;
-			if( !OptimizerUtils.checkSparkCollectMemoryBudget(dc, 0) || !_parRDDs.reserve(
-					OptimizerUtils.estimatePartitionedSizeExactSparsity(dc))) {
+			if( !mo.isFederated() && (!OptimizerUtils.checkSparkCollectMemoryBudget(dc, 0)
+				|| !_parRDDs.reserve(OptimizerUtils.estimatePartitionedSizeExactSparsity(dc)))) {
 				if( mo.isDirty() || !mo.isHDFSFileExists() ) //write if necessary
 					mo.exportData();
 				rdd = sc.hadoopFile( mo.getFileName(), inputInfo.inputFormatClass, inputInfo.keyClass, inputInfo.valueClass);
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java
index 32a3457..1d5f5df 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java
@@ -35,6 +35,7 @@ import io.netty.util.concurrent.Promise;
 import org.apache.sysds.common.Types;
 import org.apache.sysds.conf.DMLConfig;
 import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.RequestType;
 
 import java.net.InetSocketAddress;
 import java.util.concurrent.Future;
@@ -83,51 +84,15 @@ public class FederatedData {
 		return _varID != -1;
 	}
 	
-	public synchronized Future<FederatedResponse> initFederatedData() {
+	public synchronized Future<FederatedResponse> initFederatedData(long id) {
 		if(isInitialized())
 			throw new DMLRuntimeException("Tried to init already initialized data");
-		FederatedRequest.FedMethod fedMethod;
-		switch(_dataType) {
-			case MATRIX:
-				fedMethod = FederatedRequest.FedMethod.READ_MATRIX;
-				break;
-			case FRAME:
-				fedMethod = FederatedRequest.FedMethod.READ_FRAME;
-				break;
-			default:
-				throw new DMLRuntimeException("Federated datatype \"" + _dataType.toString() + "\" is not supported.");
-		}
-		FederatedRequest request = new FederatedRequest(fedMethod);
+		if(!_dataType.isMatrix() && !_dataType.isFrame())
+			throw new DMLRuntimeException("Federated datatype \"" + _dataType.toString() + "\" is not supported.");
+		_varID = id;
+		FederatedRequest request = new FederatedRequest(RequestType.READ_VAR, id);
 		request.appendParam(_filepath);
-		return executeFederatedOperation(request);
-	}
-	
-	/**
-	 * Executes an federated operation on a federated worker and default variable.
-	 *
-	 * @param request the requested operation
-	 * @param withVarID true if we should add the default varID (initialized) or false if we should not
-	 * @return the response
-	 */
-	public Future<FederatedResponse> executeFederatedOperation(FederatedRequest request, boolean withVarID) {
-		if (withVarID) {
-			if( !isInitialized() )
-				throw new DMLRuntimeException("Tried to execute federated operation on data non initialized federated data.");
-			return executeFederatedOperation(request, _varID);
-		}
-		return executeFederatedOperation(request);
-	}
-	
-	/**
-	 * Executes an federated operation on a federated worker.
-	 *
-	 * @param request the requested operation
-	 * @param varID variable ID
-	 * @return the response
-	 */
-	public Future<FederatedResponse> executeFederatedOperation(FederatedRequest request, long varID) {
-		request = request.deepClone();
-		request.appendParam(varID);
+		request.appendParam(_dataType.name());
 		return executeFederatedOperation(request);
 	}
 	
@@ -137,7 +102,7 @@ public class FederatedData {
 	 * @param request the requested operation
 	 * @return the response
 	 */
-	public synchronized Future<FederatedResponse> executeFederatedOperation(FederatedRequest request) {
+	public synchronized Future<FederatedResponse> executeFederatedOperation(FederatedRequest... request) {
 		// Careful with the number of threads. Each thread opens connections to multiple files making resulting in 
 		// java.io.IOException: Too many open files
 		EventLoopGroup workerGroup = new NioEventLoopGroup(_nrThreads);
@@ -148,9 +113,9 @@ public class FederatedData {
 				@Override
 				public void initChannel(SocketChannel ch) {
 					ch.pipeline().addLast("ObjectDecoder",
-							new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingResolver(ClassLoader.getSystemClassLoader())))
-							.addLast("FederatedOperationHandler", handler)
-							.addLast("ObjectEncoder", new ObjectEncoder());
+						new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingResolver(ClassLoader.getSystemClassLoader())))
+						.addLast("FederatedOperationHandler", handler)
+						.addLast("ObjectEncoder", new ObjectEncoder());
 				}
 			});
 			
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRange.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRange.java
index d8e20a5..b4f69ad 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRange.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRange.java
@@ -92,4 +92,14 @@ public class FederatedRange implements Comparable<FederatedRange> {
 	public String toString() {
 		return Arrays.toString(_beginDims) + " - " + Arrays.toString(_endDims);
 	}
+
+	public FederatedRange shift(long rshift, long cshift) {
+		//row shift
+		_beginDims[0] += rshift;
+		_endDims[0] += rshift;
+		//column shift
+		_beginDims[1] += cshift;
+		_endDims[1] += cshift;
+		return this;
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRequest.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRequest.java
index 771f828..f2d53e4 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRequest.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRequest.java
@@ -29,36 +29,47 @@ import org.apache.sysds.api.DMLScript;
 public class FederatedRequest implements Serializable {
 	private static final long serialVersionUID = 5946781306963870394L;
 	
-	public enum FedMethod {
-		READ_MATRIX, READ_FRAME, MATVECMULT, TRANSFER, AGGREGATE, SCALAR
+	// commands sent to and excuted by federated workers
+	public enum RequestType {
+		READ_VAR, // create variable for local data, read on first access
+		PUT_VAR,  // receive data from main and store to local variable
+		GET_VAR,  // return local variable to main
+		EXEC_INST // execute arbitrary instruction over
 	}
 	
-	private FedMethod _method;
+	private RequestType _method;
+	private long _id;
 	private List<Object> _data;
 	private boolean _checkPrivacy;
 	
-	public FederatedRequest(FedMethod method, List<Object> data) {
-		_method = method;
-		_data = data;
-		setCheckPrivacy();
+	
+	public FederatedRequest(RequestType method) {
+		this(method, FederationUtils.getNextFedDataID(), new ArrayList<>());
 	}
 	
-	public FederatedRequest(FedMethod method, Object ... datas) {
-		_method = method;
-		_data = Arrays.asList(datas);
-		setCheckPrivacy();
+	public FederatedRequest(RequestType method, long id) {
+		this(method, id, new ArrayList<>());
 	}
 	
-	public FederatedRequest(FedMethod method) {
+	public FederatedRequest(RequestType method, long id, Object ... data) {
+		this(method, id, Arrays.asList(data));
+	}
+	
+	public FederatedRequest(RequestType method, long id, List<Object> data) {
 		_method = method;
-		_data = new ArrayList<>();
+		_id = id;
+		_data = data;
 		setCheckPrivacy();
 	}
 	
-	public FedMethod getMethod() {
+	public RequestType getType() {
 		return _method;
 	}
 	
+	public long getID() {
+		return _id;
+	}
+	
 	public Object getParam(int i) {
 		return _data.get(i);
 	}
@@ -78,7 +89,7 @@ public class FederatedRequest implements Serializable {
 	}
 	
 	public FederatedRequest deepClone() {
-		return new FederatedRequest(_method, new ArrayList<>(_data));
+		return new FederatedRequest(_method, _id, new ArrayList<>(_data));
 	}
 
 	public void setCheckPrivacy(boolean checkPrivacy){
@@ -92,4 +103,13 @@ public class FederatedRequest implements Serializable {
 	public boolean checkPrivacy(){
 		return _checkPrivacy;
 	}
+	
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder("FederatedRequest[");
+		sb.append(_method); sb.append(";");
+		sb.append(_id); sb.append(";");
+		sb.append(Arrays.toString(_data.toArray())); sb.append("]");
+		return sb.toString();
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedResponse.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedResponse.java
index 3335aae..ea03bd4 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedResponse.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedResponse.java
@@ -32,36 +32,36 @@ import org.apache.sysds.runtime.privacy.PrivacyConstraint.PrivacyLevel;
 public class FederatedResponse implements Serializable {
 	private static final long serialVersionUID = 3142180026498695091L;
 	
-	public enum Type {
+	public enum ResponseType {
 		SUCCESS,
 		SUCCESS_EMPTY,
 		ERROR,
 	}
 	
-	private FederatedResponse.Type _status;
+	private ResponseType _status;
 	private Object[] _data;
 	private Map<PrivacyLevel,LongAdder> checkedConstraints;
 	
-	public FederatedResponse(FederatedResponse.Type status) {
+	public FederatedResponse(ResponseType status) {
 		this(status, null);
 	}
 	
-	public FederatedResponse(FederatedResponse.Type status, Object[] data) {
+	public FederatedResponse(ResponseType status, Object[] data) {
 		_status = status;
 		_data = data;
-		if( _status == FederatedResponse.Type.SUCCESS && data == null )
-			_status = FederatedResponse.Type.SUCCESS_EMPTY;
+		if( _status == ResponseType.SUCCESS && data == null )
+			_status = ResponseType.SUCCESS_EMPTY;
 	}
 	
-	public FederatedResponse(FederatedResponse.Type status, Object data) {
+	public FederatedResponse(FederatedResponse.ResponseType status, Object data) {
 		_status = status;
 		_data = new Object[] {data};
-		if(_status == FederatedResponse.Type.SUCCESS && data == null)
-			_status = FederatedResponse.Type.SUCCESS_EMPTY;
+		if(_status == ResponseType.SUCCESS && data == null)
+			_status = ResponseType.SUCCESS_EMPTY;
 	}
 	
 	public boolean isSuccessful() {
-		return _status != FederatedResponse.Type.ERROR;
+		return _status != ResponseType.ERROR;
 	}
 	
 	public String getErrorMessage() {
@@ -103,7 +103,7 @@ public class FederatedResponse implements Serializable {
 		if ( checkedConstraints != null && !checkedConstraints.isEmpty() ){
 			this.checkedConstraints = new EnumMap<>(PrivacyLevel.class);
 			this.checkedConstraints.putAll(checkedConstraints);
-		}	
+		}
 	}
 
 	public void updateCheckedConstraintsLog(){
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
index afed54b..1eca3a9 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
@@ -32,29 +32,29 @@ import io.netty.handler.codec.serialization.ObjectDecoder;
 import io.netty.handler.codec.serialization.ObjectEncoder;
 import org.apache.log4j.Logger;
 import org.apache.sysds.conf.DMLConfig;
-import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence;
-import org.apache.sysds.runtime.instructions.cp.Data;
-
-import java.util.HashMap;
-import java.util.Map;
+import org.apache.sysds.runtime.controlprogram.BasicProgramBlock;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContextFactory;
 
 public class FederatedWorker {
 	protected static Logger log = Logger.getLogger(FederatedWorker.class);
 
 	private int _port;
-	private int _nrThreads = Integer.parseInt(DMLConfig.DEFAULT_NUMBER_OF_FEDERATED_WORKER_THREADS);
-	private IDSequence _seq = new IDSequence();
-	private Map<Long, Data> _vars = new HashMap<>();
-
+	private final ExecutionContext _ec;
+	private final BasicProgramBlock _pb;
+	
 	public FederatedWorker(int port) {
+		_ec = ExecutionContextFactory.createContext();
+		_ec.setAutoCreateVars(true); //w/o createvar inst
+		_pb = new BasicProgramBlock(null);
 		_port = (port == -1) ?
 			Integer.parseInt(DMLConfig.DEFAULT_FEDERATED_PORT) : port;
 	}
 
 	public void run() {
 		log.info("Setting up Federated Worker");
-		EventLoopGroup bossGroup = new NioEventLoopGroup(_nrThreads);
-		EventLoopGroup workerGroup = new NioEventLoopGroup(_nrThreads);
+		EventLoopGroup bossGroup = new NioEventLoopGroup(1);
+		EventLoopGroup workerGroup = new NioEventLoopGroup(1);
 		ServerBootstrap b = new ServerBootstrap();
 		b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
 			.childHandler(new ChannelInitializer<SocketChannel>() {
@@ -65,7 +65,7 @@ public class FederatedWorker {
 							new ObjectDecoder(Integer.MAX_VALUE,
 								ClassResolvers.weakCachingResolver(ClassLoader.getSystemClassLoader())))
 						.addLast("ObjectEncoder", new ObjectEncoder())
-						.addLast("FederatedWorkerHandler", new FederatedWorkerHandler(_seq, _vars));
+						.addLast("FederatedWorkerHandler", new FederatedWorkerHandler(_ec, _pb));
 				}
 			}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
 		try {
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
index 1e2e6ea..6f6760f 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
@@ -27,25 +27,24 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 import org.apache.sysds.common.Types;
+import org.apache.sysds.common.Types.DataType;
 import org.apache.sysds.common.Types.FileFormat;
 import org.apache.sysds.conf.ConfigurationManager;
-import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.parser.DataExpression;
 import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.BasicProgramBlock;
+import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
 import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
-import org.apache.sysds.runtime.controlprogram.caching.TensorObject;
-import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence;
-import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.RequestType;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse.ResponseType;
+import org.apache.sysds.runtime.instructions.InstructionParser;
 import org.apache.sysds.runtime.instructions.cp.Data;
 import org.apache.sysds.runtime.instructions.cp.ListObject;
+import org.apache.sysds.runtime.instructions.cp.ScalarObject;
 import org.apache.sysds.runtime.io.IOUtilFunctions;
-import org.apache.sysds.runtime.matrix.data.LibMatrixAgg;
-import org.apache.sysds.runtime.matrix.data.MatrixBlock;
-import org.apache.sysds.runtime.matrix.operators.AggregateBinaryOperator;
-import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
-import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
 import org.apache.sysds.runtime.meta.MatrixCharacteristics;
 import org.apache.sysds.runtime.meta.MetaDataFormat;
 import org.apache.sysds.runtime.privacy.DMLPrivacyException;
@@ -57,39 +56,39 @@ import org.apache.wink.json4j.JSONObject;
 import java.io.BufferedReader;
 import java.io.InputStreamReader;
 import java.util.Arrays;
-import java.util.Map;
 
 public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter {
 	protected static Logger log = Logger.getLogger(FederatedWorkerHandler.class);
 
-	private final IDSequence _seq;
-	private Map<Long, Data> _vars;
-
-	public FederatedWorkerHandler(IDSequence seq, Map<Long, Data> _vars2) {
-		_seq = seq;
-		_vars = _vars2;
+	private final ExecutionContext _ec;
+	private final BasicProgramBlock _pb;
+	
+	public FederatedWorkerHandler(ExecutionContext ec, BasicProgramBlock pb) {
+		_ec = ec;
+		_pb = pb;
 	}
 
 	@Override
 	public void channelRead(ChannelHandlerContext ctx, Object msg) {
 		log.debug("Received: " + msg.getClass().getSimpleName());
-		FederatedRequest request;
-		if (msg instanceof FederatedRequest)
-			request = (FederatedRequest) msg;
-		else
-			throw new DMLRuntimeException("FederatedWorkerHandler: Received object no instance of `FederatedRequest`.");
-		FederatedRequest.FedMethod method = request.getMethod();
-		log.debug("Received command: " + method.name());
-		PrivacyMonitor.setCheckPrivacy(request.checkPrivacy());
-		PrivacyMonitor.clearCheckedConstraints();
-
-		synchronized (_seq) {
-			FederatedResponse response = constructResponse(request);
+		if (!(msg instanceof FederatedRequest[]))
+			throw new DMLRuntimeException("FederatedWorkerHandler: Received object no instance of 'FederatedRequest[]'.");
+		FederatedRequest[] requests = (FederatedRequest[]) msg;
+		FederatedResponse response = null; //last response
+		
+		for( int i=0; i<requests.length; i++ ) {
+			FederatedRequest request = requests[i];
+			if( log.isDebugEnabled() )
+				log.debug("Executing command "+(i+1)+"/"+requests.length + ": " + request.getType().name());
+			PrivacyMonitor.setCheckPrivacy(request.checkPrivacy());
+			PrivacyMonitor.clearCheckedConstraints();
+	
+			response = executeCommand(request);
 			conditionalAddCheckedConstraints(request, response);
 			if (!response.isSuccessful())
-				log.error("Method " + method + " failed: " + response.getErrorMessage());
-			ctx.writeAndFlush(response).addListener(new CloseListener());
+				log.error("Command " + request.getType() + " failed: " + response.getErrorMessage());
 		}
+		ctx.writeAndFlush(response).addListener(new CloseListener());
 	}
 
 	private static void conditionalAddCheckedConstraints(FederatedRequest request, FederatedResponse response){
@@ -97,44 +96,41 @@ public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter {
 			response.setCheckedConstraints(PrivacyMonitor.getCheckedConstraints());
 	}
 
-	private FederatedResponse constructResponse(FederatedRequest request) {
-		FederatedRequest.FedMethod method = request.getMethod();
+	private FederatedResponse executeCommand(FederatedRequest request) {
+		RequestType method = request.getType();
 		try {
 			switch (method) {
-				case READ_MATRIX:
-					return readData(request, Types.DataType.MATRIX);
-				case READ_FRAME:
-					return readData(request, Types.DataType.FRAME);
-				case MATVECMULT:
-					return executeMatVecMult(request);
-				case TRANSFER:
-					return getVariableData(request);
-				case AGGREGATE:
-					return executeAggregation(request);
-				case SCALAR:
-					return executeScalarOperation(request);
+				case READ_VAR:
+					return readData(request); //matrix/frame
+				case PUT_VAR:
+					return putVariable(request);
+				case GET_VAR:
+					return getVariable(request);
+				case EXEC_INST:
+					return execInstruction(request);
 				default:
 					String message = String.format("Method %s is not supported.", method);
-					return new FederatedResponse(FederatedResponse.Type.ERROR, new FederatedWorkerHandlerException(message));
+					return new FederatedResponse(FederatedResponse.ResponseType.ERROR, new FederatedWorkerHandlerException(message));
 			}
 		}
-		catch (DMLPrivacyException | FederatedWorkerHandlerException exception) {
-			return new FederatedResponse(FederatedResponse.Type.ERROR, exception);
+		catch (DMLPrivacyException | FederatedWorkerHandlerException ex) {
+			return new FederatedResponse(FederatedResponse.ResponseType.ERROR, ex);
 		}
-		catch (Exception exception) {
-			return new FederatedResponse(FederatedResponse.Type.ERROR, 
+		catch (Exception ex) {
+			return new FederatedResponse(FederatedResponse.ResponseType.ERROR, 
 				new FederatedWorkerHandlerException("Exception of type " 
-				+ exception.getClass() + " thrown when processing request"));
+				+ ex.getClass() + " thrown when processing request", ex));
 		}
 	}
 
-	private FederatedResponse readData(FederatedRequest request, Types.DataType dataType) {
-		checkNumParams(request.getNumParams(), 1);
+	private FederatedResponse readData(FederatedRequest request) {
+		checkNumParams(request.getNumParams(), 2);
 		String filename = (String) request.getParam(0);
-		return readData(filename, dataType);
+		DataType dt = DataType.valueOf((String)request.getParam(1));
+		return readData(filename, dt, request.getID());
 	}
 
-	private FederatedResponse readData(String filename, Types.DataType dataType) {
+	private FederatedResponse readData(String filename, Types.DataType dataType, long id) {
 		MatrixCharacteristics mc = new MatrixCharacteristics();
 		mc.setBlocksize(ConfigurationManager.getBlocksize());
 		CacheableData<?> cd;
@@ -147,7 +143,7 @@ public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter {
 				break;
 			default:
 				// should NEVER happen (if we keep request codes in sync with actual behaviour)
-				return new FederatedResponse(FederatedResponse.Type.ERROR, 
+				return new FederatedResponse(FederatedResponse.ResponseType.ERROR, 
 					new FederatedWorkerHandlerException("Could not recognize datatype"));
 		}
 		
@@ -160,7 +156,7 @@ public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter {
 				try (BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path)))) {
 					JSONObject mtd = JSONHelper.parse(br);
 					if (mtd == null)
-						return new FederatedResponse(FederatedResponse.Type.ERROR, new FederatedWorkerHandlerException("Could not parse metadata file"));
+						return new FederatedResponse(FederatedResponse.ResponseType.ERROR, new FederatedWorkerHandlerException("Could not parse metadata file"));
 					mc.setRows(mtd.getLong(DataExpression.READROWPARAM));
 					mc.setCols(mtd.getLong(DataExpression.READCOLPARAM));
 					cd = PrivacyPropagator.parseAndSetPrivacyConstraint(cd, mtd);
@@ -173,140 +169,80 @@ public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter {
 		}
 		cd.setMetaData(new MetaDataFormat(mc, fmt));
 		cd.acquireRead();
-		cd.refreshMetaData();
+		cd.refreshMetaData(); //in pinned state
 		cd.release();
-
-		long id = _seq.getNextID();
-		_vars.put(id, cd);
+		
+		//TODO spawn async load of data, otherwise on first access
+		_ec.setVariable(String.valueOf(id), cd);
+		
 		if (dataType == Types.DataType.FRAME) {
 			FrameObject frameObject = (FrameObject) cd;
-			return new FederatedResponse(FederatedResponse.Type.SUCCESS, new Object[] {id, frameObject.getSchema()});
+			return new FederatedResponse(FederatedResponse.ResponseType.SUCCESS, new Object[] {id, frameObject.getSchema()});
 		}
-		return new FederatedResponse(FederatedResponse.Type.SUCCESS, id);
+		return new FederatedResponse(FederatedResponse.ResponseType.SUCCESS, id);
 	}
-
-	private FederatedResponse executeMatVecMult(FederatedRequest request) {
-		checkNumParams(request.getNumParams(), 3);
-		MatrixBlock vector = (MatrixBlock) request.getParam(0);
-		boolean isMatVecMult = (Boolean) request.getParam(1);
-		long varID = (Long) request.getParam(2);
-
-		return executeMatVecMult(varID, vector, isMatVecMult);
-	}
-
-	private FederatedResponse executeMatVecMult(long varID, MatrixBlock vector, boolean isMatVecMult) {
-		MatrixObject matTo = (MatrixObject) _vars.get(varID);
-		matTo = PrivacyMonitor.handlePrivacy(matTo);
-		MatrixBlock matBlock1 = matTo.acquireReadAndRelease();
-		// TODO other datatypes
-		AggregateBinaryOperator ab_op = InstructionUtils
-			.getMatMultOperator(OptimizerUtils.getConstrainedNumThreads(0));
-		MatrixBlock result = isMatVecMult ?
-			matBlock1.aggregateBinaryOperations(matBlock1, vector, new MatrixBlock(), ab_op) :
-			vector.aggregateBinaryOperations(vector, matBlock1, new MatrixBlock(), ab_op);
-		return new FederatedResponse(FederatedResponse.Type.SUCCESS, result);
-	}
-
-	private FederatedResponse getVariableData(FederatedRequest request) {
+	
+	private FederatedResponse putVariable(FederatedRequest request) {
 		checkNumParams(request.getNumParams(), 1);
-		long varID = (Long) request.getParam(0);
-		return getVariableData(varID);
+		String varname = String.valueOf(request.getID());
+		if( _ec.containsVariable(varname) ) {
+			return new FederatedResponse(ResponseType.ERROR,
+				"Variable "+request.getID()+" already existing.");
+		}
+		
+		//wrap transferred cache block into cacheable data
+		Data data = null;
+		if( request.getParam(0) instanceof CacheBlock )
+			data = ExecutionContext.createCacheableData((CacheBlock) request.getParam(0));
+		else if( request.getParam(0) instanceof ScalarObject )
+			data = (ScalarObject) request.getParam(0);
+		
+		//set variable and construct empty response
+		_ec.setVariable(varname, data);
+		return new FederatedResponse(ResponseType.SUCCESS_EMPTY);
 	}
-
+	
+	private FederatedResponse getVariable(FederatedRequest request) {
+		checkNumParams(request.getNumParams(), 0);
+		if( !_ec.containsVariable(String.valueOf(request.getID())) ) {
+			return new FederatedResponse(ResponseType.ERROR,
+				"Variable "+request.getID()+" does not exist at federated worker.");
+		}
+		//get variable and construct response
+		return getVariableData(request.getID());
+	}
+	
 	private FederatedResponse getVariableData(long varID) {
-		Data dataObject = _vars.get(varID);
+		Data dataObject = _ec.getVariable(String.valueOf(varID));
 		dataObject = PrivacyMonitor.handlePrivacy(dataObject);
 		switch (dataObject.getDataType()) {
 			case TENSOR:
-				return new FederatedResponse(FederatedResponse.Type.SUCCESS,
-					((TensorObject) dataObject).acquireReadAndRelease());
 			case MATRIX:
-				return new FederatedResponse(FederatedResponse.Type.SUCCESS,
-					((MatrixObject) dataObject).acquireReadAndRelease());
 			case FRAME:
-				return new FederatedResponse(FederatedResponse.Type.SUCCESS,
-						((FrameObject) dataObject).acquireReadAndRelease());
+				return new FederatedResponse(ResponseType.SUCCESS,
+					((CacheableData<?>) dataObject).acquireReadAndRelease());
 			case LIST:
-				return new FederatedResponse(FederatedResponse.Type.SUCCESS, ((ListObject) dataObject).getData());
-			// TODO rest of the possible datatypes
+				return new FederatedResponse(ResponseType.SUCCESS, ((ListObject) dataObject).getData());
+			case SCALAR:
+				return new FederatedResponse(ResponseType.SUCCESS, dataObject);
 			default:
-				return new FederatedResponse(FederatedResponse.Type.ERROR,
-					new FederatedWorkerHandlerException("Not possible to send datatype " + dataObject.getDataType().name()));
+				return new FederatedResponse(ResponseType.ERROR,
+					new FederatedWorkerHandlerException("Unsupported return datatype " + dataObject.getDataType().name()));
 		}
 	}
-
-	private FederatedResponse executeAggregation(FederatedRequest request) {
-		checkNumParams(request.getNumParams(), 2);
-		AggregateUnaryOperator operator = (AggregateUnaryOperator) request.getParam(0);
-		long varID = (Long) request.getParam(1);
-		return executeAggregation(varID, operator);
-	}
-
-	private FederatedResponse executeAggregation(long varID, AggregateUnaryOperator operator) {
-		Data dataObject = _vars.get(varID);
-		if (dataObject.getDataType() != Types.DataType.MATRIX) {
-			return new FederatedResponse(FederatedResponse.Type.ERROR, 
-				new FederatedWorkerHandlerException("Aggregation only supported for matrices, not for "
-				+ dataObject.getDataType().name()));
+	
+	private FederatedResponse execInstruction(FederatedRequest request) {
+		_pb.getInstructions().clear();
+		_pb.getInstructions().add(InstructionParser
+			.parseSingleInstruction((String)request.getParam(0)));
+		try {
+			_pb.execute(_ec); //execute single instruction
 		}
-		MatrixObject matrixObject = (MatrixObject) dataObject;
-		matrixObject = PrivacyMonitor.handlePrivacy(matrixObject);
-		MatrixBlock matrixBlock = matrixObject.acquireRead();
-		// create matrix for calculation with correction
-		MatrixCharacteristics mc = new MatrixCharacteristics();
-		// find out the characteristics after aggregation
-		operator.indexFn.computeDimension(matrixObject.getDataCharacteristics(), mc);
-		// make outBlock right size
-		int outNumRows = (int) mc.getRows();
-		int outNumCols = (int) mc.getCols();
-		if (operator.aggOp.existsCorrection()) {
-			// add rows for correction
-			int numMissing = operator.aggOp.correction.getNumRemovedRowsColumns();
-			if (operator.aggOp.correction.isRows())
-				outNumRows += numMissing;
-			else
-				outNumCols += numMissing;
+		catch(Exception ex) {
+			ex.printStackTrace();
+			return new FederatedResponse(ResponseType.ERROR, ex.getMessage());
 		}
-		MatrixBlock ret = new MatrixBlock(outNumRows, outNumCols, operator.aggOp.initialValue);
-		LibMatrixAgg.aggregateUnaryMatrix(matrixBlock, ret, operator);
-		// result block without correction
-		ret.dropLastRowsOrColumns(operator.aggOp.correction);
-		return new FederatedResponse(FederatedResponse.Type.SUCCESS, ret);
-	}
-
-	private FederatedResponse executeScalarOperation(FederatedRequest request) {
-		checkNumParams(request.getNumParams(), 2);
-		ScalarOperator operator = (ScalarOperator) request.getParam(0);
-		long varID = (Long) request.getParam(1);
-		return executeScalarOperation(varID, operator);
-	}
-
-	private FederatedResponse executeScalarOperation(long varID, ScalarOperator operator) {
-		Data dataObject = _vars.get(varID);
-		dataObject = PrivacyMonitor.handlePrivacy(dataObject);
-		if (dataObject.getDataType() != Types.DataType.MATRIX) {
-			return new FederatedResponse(FederatedResponse.Type.ERROR,
-				new FederatedWorkerHandlerException("FederatedWorkerHandler: ScalarOperator dont support "
-					+ dataObject.getDataType().name()));
-		}
-
-		MatrixObject matrixObject = (MatrixObject) dataObject;
-		MatrixBlock inBlock = matrixObject.acquireRead();
-		MatrixBlock retBlock = inBlock.scalarOperations(operator, new MatrixBlock());
-		return new FederatedResponse(FederatedResponse.Type.SUCCESS, retBlock);
-	}
-
-	@SuppressWarnings("unused")
-	private FederatedResponse createMatrixObject(MatrixBlock result) {
-		MatrixObject resTo = new MatrixObject(Types.ValueType.FP64, OptimizerUtils.getUniqueTempFileName());
-		MetaDataFormat metadata = new MetaDataFormat(
-			new MatrixCharacteristics(result.getNumRows(), result.getNumColumns()), FileFormat.BINARY);
-		resTo.setMetaData(metadata);
-		resTo.acquireModify(result);
-		resTo.release();
-		long result_var = _seq.getNextID();
-		_vars.put(result_var, resTo);
-		return new FederatedResponse(FederatedResponse.Type.SUCCESS, result_var);
+		return new FederatedResponse(ResponseType.SUCCESS_EMPTY);
 	}
 
 	private static void checkNumParams(int actual, int... expected) {
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandlerException.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandlerException.java
index 79c1a6b..77768b3 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandlerException.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandlerException.java
@@ -37,4 +37,8 @@ public class FederatedWorkerHandlerException extends RuntimeException {
 	public FederatedWorkerHandlerException(String msg) {
 		super(msg);
 	}
+	
+	public FederatedWorkerHandlerException(String msg, Throwable t) {
+		super(msg, t);
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
new file mode 100644
index 0000000..d2e2300
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.controlprogram.federated;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
+import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.RequestType;
+import org.apache.sysds.runtime.instructions.cp.ScalarObject;
+import org.apache.sysds.runtime.instructions.cp.VariableCPInstruction;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+
+public class FederationMap
+{
+	private long _ID = -1;
+	private final Map<FederatedRange, FederatedData> _fedMap;
+	
+	public FederationMap(Map<FederatedRange, FederatedData> fedMap) {
+		this(-1, fedMap);
+	}
+	
+	public FederationMap(long ID, Map<FederatedRange, FederatedData> fedMap) {
+		_ID = ID;
+		_fedMap = fedMap;
+	}
+	
+	public long getID() {
+		return _ID;
+	}
+	
+	public boolean isInitialized() {
+		return _ID >= 0;
+	}
+	
+	public FederatedRequest broadcast(CacheableData<?> data) {
+		//prepare single request for all federated data
+		long id = FederationUtils.getNextFedDataID();
+		CacheBlock cb = data.acquireReadAndRelease();
+		return new FederatedRequest(RequestType.PUT_VAR, id, cb);
+	}
+	
+	public FederatedRequest broadcast(ScalarObject scalar) {
+		//prepare single request for all federated data
+		long id = FederationUtils.getNextFedDataID();
+		return new FederatedRequest(RequestType.PUT_VAR, id, scalar);
+	}
+	
+	public FederatedRequest[] broadcastSliced(CacheableData<?> data, boolean transposed) {
+		//prepare separate requests for different slices
+		long id = FederationUtils.getNextFedDataID();
+		CacheBlock cb = data.acquireReadAndRelease();
+		List<FederatedRequest> ret = new ArrayList<>();
+		for(Entry<FederatedRange, FederatedData> e : _fedMap.entrySet()) {
+			int rl = transposed ? 0 : e.getKey().getBeginDimsInt()[0];
+			int ru = transposed ? cb.getNumRows()-1 : e.getKey().getEndDimsInt()[0]-1;
+			int cl = transposed ? e.getKey().getBeginDimsInt()[0] : 0;
+			int cu = transposed ? e.getKey().getEndDimsInt()[0]-1 : cb.getNumColumns()-1;
+			CacheBlock tmp = cb.slice(rl, ru, cl, cu, new MatrixBlock());
+			ret.add(new FederatedRequest(RequestType.PUT_VAR, id, tmp));
+		}
+		return ret.toArray(new FederatedRequest[0]);
+	}
+	
+	@SuppressWarnings("unchecked")
+	public Future<FederatedResponse>[] execute(FederatedRequest... fr) {
+		List<Future<FederatedResponse>> ret = new ArrayList<>();
+		for(Entry<FederatedRange, FederatedData> e : _fedMap.entrySet())
+			ret.add(e.getValue().executeFederatedOperation(fr));
+		return ret.toArray(new Future[0]);
+	}
+	
+	@SuppressWarnings("unchecked")
+	public Future<FederatedResponse>[] execute(FederatedRequest[] frSlices, FederatedRequest... fr) {
+		//executes step1[] - step 2 - ... step4 (only first step federated-data-specific)
+		List<Future<FederatedResponse>> ret = new ArrayList<>(); 
+		int pos = 0;
+		for(Entry<FederatedRange, FederatedData> e : _fedMap.entrySet())
+			ret.add(e.getValue().executeFederatedOperation(addAll(frSlices[pos++], fr)));
+		return ret.toArray(new Future[0]);
+	}
+	
+	public List<Pair<FederatedRange, Future<FederatedResponse>>> requestFederatedData() {
+		if( !isInitialized() )
+			throw new DMLRuntimeException("Federated matrix read only supported on initialized FederatedData");
+		
+		List<Pair<FederatedRange, Future<FederatedResponse>>> readResponses = new ArrayList<>();
+		FederatedRequest request = new FederatedRequest(RequestType.GET_VAR, _ID);
+		for(Map.Entry<FederatedRange, FederatedData> e : _fedMap.entrySet())
+			readResponses.add(new ImmutablePair<>(e.getKey(), 
+				e.getValue().executeFederatedOperation(request)));
+		return readResponses;
+	}
+	
+	public void cleanup(long... id) {
+		FederatedRequest request = new FederatedRequest(RequestType.EXEC_INST, -1,
+			VariableCPInstruction.prepareRemoveInstruction(id).toString());
+		for(FederatedData fd : _fedMap.values())
+			fd.executeFederatedOperation(request);
+	}
+	
+	private static FederatedRequest[] addAll(FederatedRequest a, FederatedRequest[] b) {
+		FederatedRequest[] ret = new FederatedRequest[b.length + 1];
+		ret[0] = a; System.arraycopy(b, 0, ret, 1, b.length);
+		return ret;
+	}
+	
+	public FederationMap copyWithNewID() {
+		return copyWithNewID(FederationUtils.getNextFedDataID());
+	}
+	
+	public FederationMap copyWithNewID(long id) {
+		Map<FederatedRange, FederatedData> map = new TreeMap<>();
+		//TODO handling of file path, but no danger as never written
+		for( Entry<FederatedRange, FederatedData> e : _fedMap.entrySet() )
+			map.put(new FederatedRange(e.getKey()), new FederatedData(e.getValue(), id));
+		return new FederationMap(id, map);
+	}
+
+	public FederationMap rbind(long offset, FederationMap that) {
+		for( Entry<FederatedRange, FederatedData> e : that._fedMap.entrySet() ) {
+			_fedMap.put(
+				new FederatedRange(e.getKey()).shift(offset, 0),
+				new FederatedData(e.getValue(), _ID));
+		}
+		return this;
+	}
+}
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
new file mode 100644
index 0000000..ab0b3aa
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.controlprogram.federated;
+
+
+import java.util.Arrays;
+import java.util.concurrent.Future;
+
+import org.apache.sysds.common.Types.ExecType;
+import org.apache.sysds.lops.Lop;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.RequestType;
+import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence;
+import org.apache.sysds.runtime.functionobjects.KahanFunction;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.instructions.cp.DoubleObject;
+import org.apache.sysds.runtime.instructions.cp.ScalarObject;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
+import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
+
+public class FederationUtils {
+	private static final IDSequence _idSeq = new IDSequence();
+	
+	public static long getNextFedDataID() {
+		return _idSeq.getNextID();
+	}
+	
+	public static FederatedRequest callInstruction(String inst, CPOperand varOldOut, CPOperand[] varOldIn, long[] varNewIn) {
+		//TODO better and safe replacement of operand names --> instruction utils
+		long id = getNextFedDataID();
+		String linst = inst.replace(ExecType.SPARK.name(), ExecType.CP.name());
+		linst = linst.replace(Lop.OPERAND_DELIMITOR+varOldOut.getName(), Lop.OPERAND_DELIMITOR+String.valueOf(id));
+		for(int i=0; i<varOldIn.length; i++)
+			if( varOldIn[i] != null )
+				linst = linst.replace(Lop.OPERAND_DELIMITOR+varOldIn[i].getName(),
+					Lop.OPERAND_DELIMITOR+String.valueOf(varNewIn[i]));
+		return new FederatedRequest(RequestType.EXEC_INST, id, linst);
+	}
+
+	public static MatrixBlock aggAdd(Future<FederatedResponse>[] ffr) {
+		try {
+			BinaryOperator bop = InstructionUtils.parseBinaryOperator("+");
+			MatrixBlock ret = (MatrixBlock) (ffr[0].get().getData()[0]);
+			for (int i=1; i<ffr.length; i++) {
+				MatrixBlock tmp = (MatrixBlock) (ffr[i].get().getData()[0]);
+				ret.binaryOperationsInPlace(bop, tmp);
+			}
+			return ret;
+		}
+		catch(Exception ex) {
+			throw new DMLRuntimeException(ex);
+		}
+	}
+	
+	public static MatrixBlock[] getResults(Future<FederatedResponse>[] ffr) {
+		try {
+			MatrixBlock[] ret = new MatrixBlock[ffr.length];
+			for(int i=0; i<ffr.length; i++)
+				ret[i] = (MatrixBlock) ffr[i].get().getData()[0];
+			return ret;
+		}
+		catch(Exception ex) {
+			throw new DMLRuntimeException(ex);
+		}
+	}
+
+	public static MatrixBlock rbind(Future<FederatedResponse>[] ffr) {
+		// TODO handle non-contiguous cases
+		try {
+			MatrixBlock[] tmp = getResults(ffr);
+			return tmp[0].append(
+				Arrays.copyOfRange(tmp, 1, tmp.length),
+				new MatrixBlock(), false);
+		}
+		catch(Exception ex) {
+			throw new DMLRuntimeException(ex);
+		}
+	}
+
+	public static ScalarObject aggScalar(AggregateUnaryOperator aop, Future<FederatedResponse>[] ffr) {
+		if( !(aop.aggOp.increOp.fn instanceof KahanFunction) ) {
+			throw new DMLRuntimeException("Unsupported aggregation operator: "
+				+ aop.aggOp.increOp.getClass().getSimpleName());
+		}
+		//compute scalar sum of partial aggregates
+		try {
+			double sum = 0; //uak+, uasqk+
+			for( Future<FederatedResponse> fr : ffr )
+				sum += ((ScalarObject)fr.get().getData()[0]).getDoubleValue();
+			return new DoubleObject(sum);
+		}
+		catch(Exception ex) {
+			throw new DMLRuntimeException(ex);
+		}
+	}
+
+	public static MatrixBlock aggMatrix(AggregateUnaryOperator aop, Future<FederatedResponse>[] ffr) {
+		if( !(aop.aggOp.increOp.fn instanceof KahanFunction) ) {
+			throw new DMLRuntimeException("Unsupported aggregation operator: "
+				+ aop.aggOp.increOp.getClass().getSimpleName());
+		}
+		
+		//assumes full row partitions for row and col aggregates
+		return aop.isRowAggregate() ?  rbind(ffr) : aggAdd(ffr);
+	}
+}
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/LibFederatedAgg.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/LibFederatedAgg.java
deleted file mode 100644
index feb5a68..0000000
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/LibFederatedAgg.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysds.runtime.controlprogram.federated;
-
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.sysds.runtime.DMLRuntimeException;
-import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
-import org.apache.sysds.runtime.functionobjects.KahanFunction;
-import org.apache.sysds.runtime.functionobjects.ValueFunction;
-import org.apache.sysds.runtime.instructions.cp.KahanObject;
-import org.apache.sysds.runtime.matrix.data.MatrixBlock;
-import org.apache.sysds.runtime.matrix.data.MatrixValue;
-import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
-import org.apache.sysds.runtime.meta.MatrixCharacteristics;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Future;
-
-/**
- * Library for federated aggregation operations.
- * <p>
- * This libary covers the following opcodes:
- * uak+
- */
-public class LibFederatedAgg
-{
-	public static MatrixBlock aggregateUnaryMatrix(MatrixObject federatedMatrix, AggregateUnaryOperator operator) {
-		// find out the characteristics after aggregation
-		MatrixCharacteristics mc = new MatrixCharacteristics();
-		operator.indexFn.computeDimension(federatedMatrix.getDataCharacteristics(), mc);
-		// make outBlock right size
-		MatrixBlock ret = new MatrixBlock((int) mc.getRows(), (int) mc.getCols(), operator.aggOp.initialValue);
-		List<Pair<FederatedRange, Future<FederatedResponse>>> idResponsePairs = new ArrayList<>();
-		// distribute aggregation operation to all workers
-		for (Map.Entry<FederatedRange, FederatedData> entry : federatedMatrix.getFedMapping().entrySet()) {
-			FederatedData fedData = entry.getValue();
-			if (!fedData.isInitialized())
-				throw new DMLRuntimeException("Not all FederatedData was initialized for federated matrix");
-			Future<FederatedResponse> future = fedData.executeFederatedOperation(
-				new FederatedRequest(FederatedRequest.FedMethod.AGGREGATE, operator), true);
-			idResponsePairs.add(new ImmutablePair<>(entry.getKey(), future));
-		}
-		try {
-			//TODO replace with block operations
-			for (Pair<FederatedRange, Future<FederatedResponse>> idResponsePair : idResponsePairs) {
-				FederatedRange range = idResponsePair.getLeft();
-				FederatedResponse federatedResponse = idResponsePair.getRight().get();
-				int[] beginDims = range.getBeginDimsInt();
-				MatrixBlock mb = (MatrixBlock) federatedResponse.getData()[0];
-				// TODO performance optimizations
-				MatrixValue.CellIndex cellIndex = new MatrixValue.CellIndex(0, 0);
-				ValueFunction valueFn = operator.aggOp.increOp.fn;
-				// Add worker response to resultBlock
-				for (int r = 0; r < mb.getNumRows(); r++)
-					for (int c = 0; c < mb.getNumColumns(); c++) {
-						// Get the output index where the result should be placed by the index function
-						// -> map input row/col to output row/col
-						cellIndex.set(r + beginDims[0], c + beginDims[1]);
-						operator.indexFn.execute(cellIndex, cellIndex);
-						int resultRow = cellIndex.row;
-						int resultCol = cellIndex.column;
-						double newValue;
-						if (valueFn instanceof KahanFunction) {
-							// TODO iterate along correct axis to use correction correctly
-							// temporary solution to execute correct overloaded method
-							KahanObject kobj = new KahanObject(ret.quickGetValue(resultRow, resultCol), 0);
-							newValue = ((KahanObject) valueFn.execute(kobj, mb.quickGetValue(r, c)))._sum;
-						}
-						else {
-							// TODO special handling for `ValueFunction`s which do not implement `.execute(double, double)`
-							// "Add" two partial calculations together with ValueFunction
-							newValue = valueFn.execute(ret.quickGetValue(resultRow, resultCol), mb.quickGetValue(r, c));
-						}
-						ret.quickSetValue(resultRow, resultCol, newValue);
-					}
-			}
-		}
-		catch (Exception e) {
-			throw new DMLRuntimeException("Federated binary aggregation failed", e);
-		}
-		return ret;
-	}
-}
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/LibFederatedAppend.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/LibFederatedAppend.java
deleted file mode 100644
index 58ad5da..0000000
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/LibFederatedAppend.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysds.runtime.controlprogram.federated;
-
-import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
-import org.apache.sysds.runtime.meta.DataCharacteristics;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-public class LibFederatedAppend {
-	public static MatrixObject federateAppend(MatrixObject matObject1, MatrixObject matObject2,
-		MatrixObject matObjectRet, boolean cbind)
-	{
-		Map<FederatedRange, FederatedData> fedMapping = new TreeMap<>();
-		DataCharacteristics dc = matObjectRet.getDataCharacteristics();
-		if (cbind) {
-			// check for same amount of rows for matObject1 and matObject2 should have been checked before call
-			dc.setRows(matObject1.getNumRows());
-			// added because cbind
-			long columnsLeftMat = matObject1.getNumColumns();
-			dc.setCols(columnsLeftMat + matObject2.getNumColumns());
-			
-			Map<FederatedRange, FederatedData> fedMappingLeft = matObject1.getFedMapping();
-			for (Map.Entry<FederatedRange, FederatedData> entry : fedMappingLeft.entrySet()) {
-				// note that FederatedData should not change its varId once set
-				fedMapping.put(new FederatedRange(entry.getKey()), entry.getValue());
-			}
-			Map<FederatedRange, FederatedData> fedMappingRight = matObject2.getFedMapping();
-			for (Map.Entry<FederatedRange, FederatedData> entry : fedMappingRight.entrySet()) {
-				// add offset due to cbind
-				FederatedRange range = new FederatedRange(entry.getKey());
-				range.setBeginDim(1, columnsLeftMat + range.getBeginDims()[1]);
-				range.setEndDim(1, columnsLeftMat + range.getEndDims()[1]);
-				fedMapping.put(range, entry.getValue());
-			}
-		}
-		else {
-			// check for same amount of cols for matObject1 and matObject2 should have been checked before call
-			dc.setCols(matObject1.getNumColumns());
-			// added because rbind
-			long rowsUpperMat = matObject1.getNumRows();
-			dc.setRows(rowsUpperMat + matObject2.getNumRows());
-			
-			Map<FederatedRange, FederatedData> fedMappingUpper = matObject1.getFedMapping();
-			for (Map.Entry<FederatedRange, FederatedData> entry : fedMappingUpper.entrySet()) {
-				// note that FederatedData should not change its varId once set
-				fedMapping.put(new FederatedRange(entry.getKey()), entry.getValue());
-			}
-			Map<FederatedRange, FederatedData> fedMappingLower = matObject2.getFedMapping();
-			for (Map.Entry<FederatedRange, FederatedData> entry : fedMappingLower.entrySet()) {
-				// add offset due to rbind
-				FederatedRange range = new FederatedRange(entry.getKey());
-				range.setBeginDim(0, rowsUpperMat + range.getBeginDims()[0]);
-				range.setEndDim(0, rowsUpperMat + range.getEndDims()[0]);
-				fedMapping.put(range, entry.getValue());
-			}
-		}
-		matObjectRet.setFedMapping(fedMapping);
-		dc.setNonZeros(matObject1.getNnz() + matObject2.getNnz());
-		return matObjectRet;
-	}
-}
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/MatrixIndexingCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/MatrixIndexingCPInstruction.java
index d640313..db70d6b 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/cp/MatrixIndexingCPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/MatrixIndexingCPInstruction.java
@@ -115,7 +115,7 @@ public final class MatrixIndexingCPInstruction extends IndexingCPInstruction {
 			resultBlock.examSparsity();
 			
 			//unpin output
-			ec.setMatrixOutput(output.getName(), resultBlock, updateType, getExtendedOpcode());
+			ec.setMatrixOutput(output.getName(), resultBlock, updateType);
 		}
 		else
 			throw new DMLRuntimeException("Invalid opcode (" + opcode +") encountered in MatrixIndexingCPInstruction.");
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
index 2082047..f7f3698 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
@@ -21,6 +21,7 @@ package org.apache.sysds.runtime.instructions.cp;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.commons.lang.StringUtils;
@@ -1102,6 +1103,12 @@ public class VariableCPInstruction extends CPInstruction implements LineageTrace
 		}
 	}
 	
+	public static Instruction prepareRemoveInstruction(long... varName) {
+		String[] tmp = new String[varName.length];
+		Arrays.setAll(tmp, i -> String.valueOf(varName[i]));
+		return prepareRemoveInstruction(tmp);
+	}
+	
 	public static Instruction prepareRemoveInstruction(String... varNames) {
 		StringBuilder sb = new StringBuilder();
 		sb.append("CP");
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateBinaryFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateBinaryFEDInstruction.java
index 2c064fc..3fe1004 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateBinaryFEDInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateBinaryFEDInstruction.java
@@ -19,31 +19,18 @@
 
 package org.apache.sysds.runtime.instructions.fed;
 
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.MutablePair;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
-import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
-import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
-import org.apache.sysds.runtime.functionobjects.Multiply;
-import org.apache.sysds.runtime.functionobjects.Plus;
+import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.RequestType;
 import org.apache.sysds.runtime.instructions.InstructionUtils;
 import org.apache.sysds.runtime.instructions.cp.CPOperand;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
-import org.apache.sysds.runtime.matrix.operators.AggregateBinaryOperator;
 import org.apache.sysds.runtime.matrix.operators.Operator;
-import org.apache.sysds.runtime.util.CommonThreadPool;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
 public class AggregateBinaryFEDInstruction extends BinaryFEDInstruction {
@@ -56,329 +43,53 @@ public class AggregateBinaryFEDInstruction extends BinaryFEDInstruction {
 	public static AggregateBinaryFEDInstruction parseInstruction(String str) {
 		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
 		String opcode = parts[0];
-		
-		if(!opcode.equalsIgnoreCase("ba+*")) {
+		if(!opcode.equalsIgnoreCase("ba+*"))
 			throw new DMLRuntimeException("AggregateBinaryInstruction.parseInstruction():: Unknown opcode " + opcode);
-		}
 		
 		InstructionUtils.checkNumFields(parts, 4);
 		CPOperand in1 = new CPOperand(parts[1]);
 		CPOperand in2 = new CPOperand(parts[2]);
 		CPOperand out = new CPOperand(parts[3]);
 		int k = Integer.parseInt(parts[4]);
-		
 		return new AggregateBinaryFEDInstruction(
 			InstructionUtils.getMatMultOperator(k), in1, in2, out, opcode, str);
 	}
 	
 	@Override
 	public void processInstruction(ExecutionContext ec) {
-		//get inputs
-		MatrixObject mo1 = ec.getMatrixObject(input1.getName());
-		MatrixObject mo2 = ec.getMatrixObject(input2.getName());
-		MatrixObject out = ec.getMatrixObject(output.getName());
-		
-		// compute matrix-vector multiplication
-		AggregateBinaryOperator ab_op = (AggregateBinaryOperator) _optr;
-		if (mo1.isFederated() && mo2.getNumColumns() == 1) {// MV
-			MatrixBlock vector = mo2.acquireRead();
-			federatedAggregateBinaryMV(mo1, vector, out, ab_op, true);
-			mo2.release();
-		}
-		else if (mo2.isFederated() && mo1.getNumRows() == 1) {// VM
-			MatrixBlock vector = mo1.acquireRead();
-			federatedAggregateBinaryMV(mo2, vector, out, ab_op, false);
-			mo1.release();
-		}
-		else // MM
-			federatedAggregateBinary(mo1, mo2, out);
-	}
-	
-	/**
-	 * Performs a federated binary aggregation (currently only MV and VM is supported).
-	 *
-	 * @param mo1 the first matrix object
-	 * @param mo2 the other matrix object
-	 * @param out output matrix object
-	 */
-	private static void federatedAggregateBinary(MatrixObject mo1, MatrixObject mo2, MatrixObject out) {
-		boolean distributeCols = false;
-		// if distributeCols = true we distribute cols of mo2 and do a MV multiplications, otherwise we
-		// distribute rows of mo1 and do VM multiplications
-		if (mo1.isFederated() && mo2.isFederated()) {
-			// both are federated -> distribute smaller matrix
-			// TODO do more in depth checks like: how many federated workers, how big is the actual data we send and so on
-			// maybe once we track number of non zeros we could use that to get a better estimation of how much data
-			// will be requested?
-			distributeCols = mo2.getNumColumns() * mo2.getNumRows() < mo1.getNumColumns() * mo1.getNumRows();
-		}
-		else if (mo2.isFederated() && !mo1.isFederated()) {
-			// Distribute mo1 which is not federated
-			distributeCols = true;
-		}
-		// TODO performance if both matrices are federated
-		Map<FederatedRange, FederatedData> mapping = distributeCols ? mo1.getFedMapping() : mo2.getFedMapping();
-		MatrixBlock matrixBlock = distributeCols ? mo2.acquireRead() : mo1.acquireRead();
-		ExecutorService pool = CommonThreadPool.get(mapping.size());
-		ArrayList<Pair<FederatedRange, MatrixBlock>> results = new ArrayList<>();
-		ArrayList<FederatedMMTask> tasks = new ArrayList<>();
-		for (Map.Entry<FederatedRange, FederatedData> fedMap : mapping.entrySet()) {
-			// this resultPair will contain both position of partial result and the partial result itself of the operations
-			MutablePair<FederatedRange, MatrixBlock> resultPair = new MutablePair<>();
-			// they all get references to the real block, the task slices out the needed part and does the
-			// multiplication, therefore they can share the object since we use it immutably
-			tasks.add(new FederatedMMTask(fedMap.getKey(), fedMap.getValue(), resultPair, matrixBlock, distributeCols));
-			results.add(resultPair);
-		}
-		CommonThreadPool.invokeAndShutdown(pool, tasks);
-		(distributeCols?mo2:mo1).release();
-		
-		// combine results
-		if (mo1.getNumRows() > Integer.MAX_VALUE || mo2.getNumColumns() > Integer.MAX_VALUE) {
-			throw new DMLRuntimeException("Federated matrix is too large for federated distribution");
-		}
-		out.acquireModify(combinePartialMMResults(results, (int) mo1.getNumRows(), (int) mo2.getNumColumns()));
-		out.release();
-	}
-	
-	private static MatrixBlock combinePartialMMResults(ArrayList<Pair<FederatedRange, MatrixBlock>> results, 
-		int rows, int cols) {
-		// TODO support large blocks with > int size
-		MatrixBlock resultBlock = new MatrixBlock(rows, cols, false);
-		for (Pair<FederatedRange, MatrixBlock> partialResult : results) {
-			FederatedRange range = partialResult.getLeft();
-			MatrixBlock partialBlock = partialResult.getRight();
-			int[] dimsLower = range.getBeginDimsInt();
-			int[] dimsUpper = range.getEndDimsInt();
-			resultBlock.copy(dimsLower[0], dimsUpper[0] - 1, dimsLower[1], dimsUpper[1] - 1, partialBlock, false);
-		}
-		resultBlock.recomputeNonZeros();
-		return resultBlock;
-	}
-	
-	/**
-	 * Performs a federated binary aggregation on a Matrix and Vector.
-	 *
-	 * @param fedMo          the federated matrix object
-	 * @param vector         the vector
-	 * @param output         the output matrix object
-	 * @param op             the operation
-	 * @param matrixVectorOp true if matrix vector operation, false if vector matrix op
-	 */
-	public static void federatedAggregateBinaryMV(MatrixObject fedMo, MatrixBlock vector, MatrixObject output,
-			AggregateBinaryOperator op, boolean matrixVectorOp) {
-		if (!(op.binaryFn instanceof Multiply && op.aggOp.increOp.fn instanceof Plus))
-			throw new DMLRuntimeException("Only matrix-vector is supported for federated binary aggregation");
-		// fixed implementation only for mv, vm multiply and plus
-		// TODO move this to a Lib class?
-		
-		// create output matrix
-		MatrixBlock resultBlock;
-		// if we change the order of parameters, so VM instead of MV, the output has different dimensions
-		if (!matrixVectorOp) {
-			output.getDataCharacteristics().setRows(1).setCols(fedMo.getNumColumns());
-			resultBlock = new MatrixBlock(1, (int) fedMo.getNumColumns(), false);
-		}
-		else {
-			output.getDataCharacteristics().setRows(fedMo.getNumRows()).setCols(1);
-			resultBlock = new MatrixBlock((int) fedMo.getNumRows(), 1, false);
-		}
-		List<Pair<FederatedRange, Future<FederatedResponse>>> idResponsePairs = new ArrayList<>();
-		// TODO parallel for loop (like on lines 125-136)
-		for (Map.Entry<FederatedRange, FederatedData> entry : fedMo.getFedMapping().entrySet()) {
-			FederatedRange range = entry.getKey();
-			FederatedData fedData = entry.getValue();
-			Future<FederatedResponse> future = executeMVMultiply(range, fedData, vector, matrixVectorOp);
-			idResponsePairs.add(new ImmutablePair<>(range, future));
-		}
-		try {
-			for (Pair<FederatedRange, Future<FederatedResponse>> idResponsePair : idResponsePairs) {
-				FederatedRange range = idResponsePair.getLeft();
-				FederatedResponse federatedResponse = idResponsePair.getRight().get();
-				combinePartialMVResults(range, federatedResponse, resultBlock, matrixVectorOp);
-			}
-		}
-		catch (Exception e) {
-			throw new DMLRuntimeException("Federated binary aggregation failed", e);
-		}
-		long nnz = resultBlock.recomputeNonZeros();
-		output.acquireModify(resultBlock);
-		output.getDataCharacteristics().setNonZeros(nnz);
-		output.release();
-	}
-	
-	private static void combinePartialMVResults(FederatedRange range,
-		FederatedResponse federatedResponse, MatrixBlock resultBlock, boolean matrixVectorOp)
-	{
-		try {
-			int[] beginDims = range.getBeginDimsInt();
-			MatrixBlock mb = (MatrixBlock) federatedResponse.getData()[0];
-			// TODO performance optimizations
-			// TODO Improve Vector Matrix multiplication accuracy: An idea would be to make use of kahan plus here,
-			//  this should improve accuracy a bit, although we still lose out on the small error lost on the worker
-			//  without having to return twice the amount of data (value + sum error)
-			// Add worker response to resultBlock
-			for (int r = 0; r < mb.getNumRows(); r++)
-				for (int c = 0; c < mb.getNumColumns(); c++) {
-					int resultRow = r + (!matrixVectorOp ? 0 : beginDims[0]);
-					int resultColumn = c + (!matrixVectorOp ? beginDims[1] : 0);
-					resultBlock.quickSetValue(resultRow, resultColumn,
-						resultBlock.quickGetValue(resultRow, resultColumn) + mb.quickGetValue(r, c));
-				}
-		} catch (Exception e){
-			throw new DMLRuntimeException("Combine partial results from federated matrix failed.", e);
-		}
-	}
-	
-	private static Future<FederatedResponse> executeMVMultiply(FederatedRange range,
-		FederatedData fedData, MatrixBlock vector, boolean matrixVectorOp)
-	{
-		if (!fedData.isInitialized()) {
-			throw new DMLRuntimeException("Not all FederatedData was initialized for federated matrix");
-		}
-		int[] beginDimsInt = range.getBeginDimsInt();
-		int[] endDimsInt = range.getEndDimsInt();
-		// params for federated request
-		List<Object> params = new ArrayList<>();
-		// we broadcast the needed part of the small vector
-		MatrixBlock vectorSlice;
-		if (!matrixVectorOp) {
-			// if we the size already is ok, we do not have to copy a slice
-			int length = endDimsInt[0] - beginDimsInt[0];
-			if (vector.getNumColumns() == length) {
-				vectorSlice = vector;
-			}
-			else {
-				vectorSlice = new MatrixBlock(1, length, false);
-				vector.slice(0, 0, beginDimsInt[0], endDimsInt[0] - 1, vectorSlice);
-			}
-		}
-		else {
-			int length = endDimsInt[1] - beginDimsInt[1];
-			if (vector.getNumRows() == length) {
-				vectorSlice = vector;
-			}
-			else {
-				vectorSlice = new MatrixBlock(length, 1, false);
-				vector.slice(beginDimsInt[1], endDimsInt[1] - 1, 0, 0, vectorSlice);
-			}
-		}
-		params.add(vectorSlice);
-		params.add(matrixVectorOp); // if is matrix vector multiplication true, otherwise false
-		return fedData.executeFederatedOperation(
-			new FederatedRequest(FederatedRequest.FedMethod.MATVECMULT, params), true);
-	}
-	
-	private static class FederatedMMTask implements Callable<Void> {
-		private FederatedRange _range;
-		private FederatedData _data;
-		private MutablePair<FederatedRange, MatrixBlock> _result;
-		private MatrixBlock _otherMatrix;
-		private boolean _distributeCols;
-		
-		public FederatedMMTask(FederatedRange range, FederatedData fedData,
-			MutablePair<FederatedRange, MatrixBlock> result, MatrixBlock otherMatrix, boolean distributeCols)
-		{
-			_range = range;
-			_data = fedData;
-			_result = result;
-			_otherMatrix = otherMatrix;
-			_distributeCols = distributeCols;
-		}
-		
-		@Override
-		public Void call() throws Exception {
-			if (_distributeCols)
-				executeColWiseMVMultiplication();
-			else
-				executeRowWiseVMMultiplications();
-			return null;
-		}
-		
-		/**
-		 * Distribute the non or smaller federated block as row vectors to the federated worker and do row number of
-		 * times a vector-matrix multiplication. Non or smaller federated block is left operand.
-		 *
-		 * @throws InterruptedException if .get() on federated response future fails -> interrupted
-		 * @throws ExecutionException   if .get() on federated response future fails -> execution failed
-		 */
-		private void executeRowWiseVMMultiplications() throws InterruptedException, ExecutionException {
-			MatrixBlock result;
-			// TODO support large matrices with long indexes
-			int[] beginDims = _range.getBeginDimsInt();
-			int[] endDims = _range.getEndDimsInt();
-			// we take all rows but only the columns between the rows of the federated block of the other block (left
-			// hand side of the calculation).
-			int rowsBeginOtherBlock = 0;
-			int colsBeginOtherBlock = beginDims[0];
-			int rowsEndOtherBlock = _otherMatrix.getNumRows();
-			int colsEndOtherBlock = endDims[0];
-			// Size of partial result block for vm is rows of otherBlock * cols of federatedData
-			result = new MatrixBlock(rowsEndOtherBlock - rowsBeginOtherBlock, endDims[1] - beginDims[1], false);
-			// Set range of output in result block, rows are the number of rows of the other block, while columns
-			// are the number of columns of our federated data
-			_result.setLeft(new FederatedRange(new long[] {rowsBeginOtherBlock, beginDims[1]},
-					new long[] {rowsEndOtherBlock, endDims[1]}));
-			// vector which we will distribute otherBlock.rows number of times
-			MatrixBlock vec = new MatrixBlock(1, colsEndOtherBlock - colsBeginOtherBlock, false);
-			for (int r = rowsBeginOtherBlock; r < rowsEndOtherBlock; r++) {
-				// slice row vector out of other matrix which we will send to federated worker
-				_otherMatrix.slice(r, r, colsBeginOtherBlock, colsEndOtherBlock - 1, vec);
-				// TODO experiment if sending multiple requests at the same time to the same worker increases
-				//  performance (remove get and do multithreaded?)
-				FederatedResponse response = executeMVMultiply(_range, _data, vec, _distributeCols).get();
-				try{
-					result.copy(r, r, 0, endDims[1] - beginDims[1] - 1, (MatrixBlock) response.getData()[0], true);
-				} catch (Exception e) {
-					throw new DMLRuntimeException(
-						"Federated Matrix-Matrix Multiplication failed: ", e);
-				}		
-			}
-			_result.setRight(result);
-		}
+		MatrixObject mo1 = ec.getMatrixObject(input1);
+		MatrixObject mo2 = ec.getMatrixObject(input2);
 		
-		/**
-		 * Distribute the non or smaller federated block as col vectors to the federated worker and do column number of
-		 * times a matrix-vector multiplication. Non or smaller federated block is right operand.
-		 *
-		 * @throws InterruptedException if .get() on federated response future fails -> interrupted
-		 * @throws ExecutionException   if .get() on federated response future fails -> execution failed
-		 */
-		private void executeColWiseMVMultiplication()
-				throws InterruptedException, ExecutionException {
-			MatrixBlock result;
-			// TODO support large matrices with long indexes
-			int[] beginDims = _range.getBeginDimsInt();
-			int[] endDims = _range.getEndDimsInt();
-			// we take all columns but only the rows between the columns of the federated block of the other block (right
-			// hand side of the calculation).
-			int rowsBeginOtherBlock = beginDims[1];
-			int colsBeginOtherBlock = 0;
-			int rowsEndOtherBlock = endDims[1];
-			int colsEndOtherBlock = _otherMatrix.getNumColumns();
-			// Size of partial result block for mv is rows of federated block * cols of other block
-			result = new MatrixBlock(endDims[0] - beginDims[0], colsEndOtherBlock - colsBeginOtherBlock, false);
-			// Set range of output in result block, rows are the number of rows of the federated data, while columns
-			// are the number of columns of the other block
-			_result.setLeft(new FederatedRange(new long[] {beginDims[0], colsBeginOtherBlock},
-					new long[] {endDims[0], colsEndOtherBlock}));
-			// vector which we will distribute otherBlock.cols number of times
-			MatrixBlock vec = new MatrixBlock(rowsEndOtherBlock - rowsBeginOtherBlock, 1, false);
-			for (int c = colsBeginOtherBlock; c < colsEndOtherBlock; c++) {
-				// slice column vector out of other matrix which we will send to federated worker
-				_otherMatrix.slice(rowsBeginOtherBlock, rowsEndOtherBlock - 1, c, c, vec);
-				// TODO experiment if sending multiple requests at the same time to the same worker increases
-				//  performance
-				FederatedResponse response = executeMVMultiply(_range, _data, vec, _distributeCols).get();
-				try {
-					result.copy(0, endDims[0] - beginDims[0] - 1, c, c, (MatrixBlock) response.getData()[0], true);
-				} catch (Exception e){
-					throw new DMLRuntimeException(
-						"Federated Matrix-Matrix Multiplication failed: ", e);
-				}
-				
-			}
-			_result.setRight(result);
+		//#1 federated matrix-vector multiplication
+		if(mo1.isFederated()) { // MV + MM
+			//construct commands: broadcast rhs, fed mv, retrieve results
+			FederatedRequest fr1 = mo1.getFedMapping().broadcast(mo2);
+			FederatedRequest fr2 = FederationUtils.callInstruction(instString, output,
+				new CPOperand[]{input1, input2}, new long[]{mo1.getFedMapping().getID(), fr1.getID()});
+			FederatedRequest fr3 = new FederatedRequest(RequestType.GET_VAR, fr2.getID());
+			//execute federated operations and aggregate
+			Future<FederatedResponse>[] tmp = mo1.getFedMapping().execute(fr1, fr2, fr3);
+			MatrixBlock ret = FederationUtils.rbind(tmp);
+			mo1.getFedMapping().cleanup(fr1.getID(), fr2.getID());
+			ec.setMatrixOutput(output.getName(), ret);
+			//TODO should remain federated matrix (no need for agg)
+		}
+		//#2 vector - federated matrix multiplication
+		else if (mo2.isFederated()) {// VM + MM
+			//construct commands: broadcast rhs, fed mv, retrieve results
+			FederatedRequest[] fr1 = mo2.getFedMapping().broadcastSliced(mo1, true);
+			FederatedRequest fr2 = FederationUtils.callInstruction(instString, output,
+				new CPOperand[]{input1, input2}, new long[]{fr1[0].getID(), mo2.getFedMapping().getID()});
+			FederatedRequest fr3 = new FederatedRequest(RequestType.GET_VAR, fr2.getID());
+			//execute federated operations and aggregate
+			Future<FederatedResponse>[] tmp = mo2.getFedMapping().execute(fr1, fr2, fr3);
+			MatrixBlock ret = FederationUtils.aggAdd(tmp);
+			mo2.getFedMapping().cleanup(fr1[0].getID(), fr2.getID());
+			ec.setMatrixOutput(output.getName(), ret);
+		}
+		else { //other combinations
+			throw new DMLRuntimeException("Federated AggregateBinary not supported with the "
+				+ "following federated objects: "+mo1.isFederated()+" "+mo2.isFederated());
 		}
 	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateUnaryFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateUnaryFEDInstruction.java
index 3b9f57c..e5dd81e 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateUnaryFEDInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateUnaryFEDInstruction.java
@@ -19,23 +19,22 @@
 
 package org.apache.sysds.runtime.instructions.fed;
 
-import org.apache.sysds.common.Types;
-import org.apache.sysds.common.Types.DataType;
-import org.apache.sysds.conf.ConfigurationManager;
-import org.apache.sysds.runtime.DMLRuntimeException;
+import java.util.concurrent.Future;
+
+import org.apache.sysds.lops.LopProperties.ExecType;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
-import org.apache.sysds.runtime.controlprogram.federated.LibFederatedAgg;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.RequestType;
 import org.apache.sysds.runtime.instructions.InstructionUtils;
 import org.apache.sysds.runtime.instructions.cp.CPOperand;
-import org.apache.sysds.runtime.instructions.cp.DoubleObject;
-import org.apache.sysds.runtime.matrix.data.MatrixBlock;
-import org.apache.sysds.runtime.matrix.operators.AggregateOperator;
 import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
 
 public class AggregateUnaryFEDInstruction extends UnaryFEDInstruction {
 	
-	private AggregateUnaryFEDInstruction(AggregateUnaryOperator auop, AggregateOperator aop, CPOperand in,
+	private AggregateUnaryFEDInstruction(AggregateUnaryOperator auop, CPOperand in,
 			CPOperand out, String opcode, String istr) {
 		super(FEDType.AggregateUnary, auop, in, out, opcode, istr);
 	}
@@ -45,37 +44,28 @@ public class AggregateUnaryFEDInstruction extends UnaryFEDInstruction {
 		String opcode = parts[0];
 		CPOperand in1 = new CPOperand(parts[1]);
 		CPOperand out = new CPOperand(parts[2]);
-		String aopcode = InstructionUtils.deriveAggregateOperatorOpcode(opcode);
-		Types.CorrectionLocationType corrLoc = InstructionUtils
-			.deriveAggregateOperatorCorrectionLocation(opcode);
 		AggregateUnaryOperator aggun = InstructionUtils.parseBasicAggregateUnaryOperator(opcode);
-		AggregateOperator aop = InstructionUtils.parseAggregateOperator(aopcode, corrLoc.toString());
-		return new AggregateUnaryFEDInstruction(aggun, aop, in1, out, opcode, str);
+		if(InstructionUtils.getExecType(str) == ExecType.SPARK)
+			str = InstructionUtils.replaceOperand(str, 4, "-1");
+		return new AggregateUnaryFEDInstruction(aggun, in1, out, opcode, str);
 	}
 	
 	@Override
 	public void processInstruction(ExecutionContext ec) {
-		String output_name = output.getName();
-		String opcode = getOpcode();
+		AggregateUnaryOperator aop = (AggregateUnaryOperator) _optr;
+		MatrixObject in = ec.getMatrixObject(input1);
+		
+		//create federated commands for aggregation
+		FederatedRequest fr1 = FederationUtils.callInstruction(instString, output,
+			new CPOperand[]{input1}, new long[]{in.getFedMapping().getID()});
+		FederatedRequest fr2 = new FederatedRequest(RequestType.GET_VAR, fr1.getID());
 		
-		AggregateUnaryOperator au_op = (AggregateUnaryOperator) _optr;
-		MatrixObject matrixObject;
-		if (input1.getDataType() == DataType.MATRIX &&
-				(matrixObject = ec.getMatrixObject(input1.getName())).isFederated()) {
-			MatrixBlock outMatrix = LibFederatedAgg.aggregateUnaryMatrix(matrixObject, au_op);
-			
-			if (output.getDataType() == DataType.SCALAR) {
-				DoubleObject ret = new DoubleObject(outMatrix.getValue(0, 0));
-				ec.setScalarOutput(output_name, ret);
-			}
-			else {
-				ec.setMatrixOutput(output_name, outMatrix);
-				ec.getMatrixObject(output_name).getDataCharacteristics()
-					.setBlocksize(ConfigurationManager.getBlocksize());
-			}
-		}
-		else {
-			throw new DMLRuntimeException(opcode + " only supported on federated matrix.");
-		}
+		//execute federated commands and cleanups
+		Future<FederatedResponse>[] tmp = in.getFedMapping().execute(fr1, fr2);
+		in.getFedMapping().cleanup(fr1.getID());
+		if( output.isScalar() )
+			ec.setVariable(output.getName(), FederationUtils.aggScalar(aop, tmp));
+		else
+			ec.setMatrixOutput(output.getName(), FederationUtils.aggMatrix(aop, tmp));
 	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/AppendFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/AppendFEDInstruction.java
index d770063..8fed7f7 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/AppendFEDInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/AppendFEDInstruction.java
@@ -22,27 +22,22 @@ package org.apache.sysds.runtime.instructions.fed;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
-import org.apache.sysds.runtime.controlprogram.federated.LibFederatedAppend;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
 import org.apache.sysds.runtime.functionobjects.OffsetColumnIndex;
 import org.apache.sysds.runtime.instructions.InstructionUtils;
 import org.apache.sysds.runtime.instructions.cp.CPOperand;
 import org.apache.sysds.runtime.matrix.operators.Operator;
 import org.apache.sysds.runtime.matrix.operators.ReorgOperator;
+import org.apache.sysds.runtime.meta.DataCharacteristics;
 
 public class AppendFEDInstruction extends BinaryFEDInstruction {
-	public enum FEDAppendType {
-		CBIND, RBIND;
-		public boolean isCBind() {
-			return this == CBIND;
-		}
-	}
-	
-	protected final FEDAppendType _type;
+	protected boolean _cbind; //otherwise rbind
 	
-	protected AppendFEDInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand out, FEDAppendType type,
-			String opcode, String istr) {
+	protected AppendFEDInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand out,
+		boolean cbind, String opcode, String istr) {
 		super(FEDType.Append, op, in1, in2, out, opcode, istr);
-		_type = type;
+		_cbind = cbind;
 	}
 	
 	public static AppendFEDInstruction parseInstruction(String str) {
@@ -55,36 +50,53 @@ public class AppendFEDInstruction extends BinaryFEDInstruction {
 		CPOperand out = new CPOperand(parts[parts.length - 2]);
 		boolean cbind = Boolean.parseBoolean(parts[parts.length - 1]);
 		
-		FEDAppendType type = cbind ? FEDAppendType.CBIND : FEDAppendType.RBIND;
-		
-		if (!opcode.equalsIgnoreCase("append") && !opcode.equalsIgnoreCase("remove") 
-			&& !opcode.equalsIgnoreCase("galignedappend"))
-			throw new DMLRuntimeException("Unknown opcode while parsing a AppendCPInstruction: " + str);
-		
 		Operator op = new ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1));
-		return new AppendFEDInstruction(op, in1, in2, out, type, opcode, str);
+		return new AppendFEDInstruction(op, in1, in2, out, cbind, opcode, str);
 	}
 	
 	@Override
 	public void processInstruction(ExecutionContext ec) {
 		//get inputs
-		MatrixObject matObject1 = ec.getMatrixObject(input1.getName());
-		MatrixObject matObject2 = ec.getMatrixObject(input2.getName());
+		MatrixObject mo1 = ec.getMatrixObject(input1.getName());
+		MatrixObject mo2 = ec.getMatrixObject(input2.getName());
+		DataCharacteristics dc1 = mo1.getDataCharacteristics();
+		DataCharacteristics dc2 = mo1.getDataCharacteristics();
+		
 		//check input dimensions
-		if (_type == FEDAppendType.CBIND && matObject1.getNumRows() != matObject2.getNumRows()) {
+		if (_cbind && mo1.getNumRows() != mo2.getNumRows()) {
 			throw new DMLRuntimeException(
 				"Append-cbind is not possible for federated input matrices " + input1.getName() + " and "
-				+ input2.getName() + " with different number of rows: " + matObject1.getNumRows() + " vs "
-				+ matObject2.getNumRows());
+				+ input2.getName() + " with different number of rows: " + mo1.getNumRows() + " vs "
+				+ mo2.getNumRows());
 		}
-		else if (_type == FEDAppendType.RBIND && matObject1.getNumColumns() != matObject2.getNumColumns()) {
+		else if (!_cbind && mo1.getNumColumns() != mo2.getNumColumns()) {
 			throw new DMLRuntimeException(
 				"Append-rbind is not possible for federated input matrices " + input1.getName() + " and "
-				+ input2.getName() + " with different number of columns: " + matObject1.getNumColumns()
-				+ " vs " + matObject2.getNumColumns());
+				+ input2.getName() + " with different number of columns: " + mo1.getNumColumns()
+				+ " vs " + mo2.getNumColumns());
+		}
+		
+		if( mo1.isFederated() && _cbind ) {
+			FederatedRequest fr1 = mo1.getFedMapping().broadcast(mo2);
+			FederatedRequest fr2 = FederationUtils.callInstruction(instString, output,
+				new CPOperand[]{input1, input2}, new long[]{mo1.getFedMapping().getID(), fr1.getID()});
+			mo1.getFedMapping().execute(fr1, fr2);
+			//derive new fed mapping for output
+			MatrixObject out = ec.getMatrixObject(output);
+			out.getDataCharacteristics().set(dc1.getRows(), dc1.getCols()+dc2.getCols(),
+				dc1.getBlocksize(), dc1.getNonZeros()+dc2.getNonZeros());
+			out.setFedMapping(mo1.getFedMapping().copyWithNewID(fr2.getID()));
+		}
+		else if( mo1.isFederated() && mo2.isFederated() && !_cbind ) {
+			MatrixObject out = ec.getMatrixObject(output);
+			out.getDataCharacteristics().set(dc1.getRows()+dc2.getRows(), dc1.getCols(),
+				dc1.getBlocksize(), dc1.getNonZeros()+dc2.getNonZeros());
+			long id = FederationUtils.getNextFedDataID();
+			out.setFedMapping(mo1.getFedMapping().copyWithNewID(id).rbind(dc1.getRows(), mo2.getFedMapping()));
+		}
+		else { //other combinations
+			throw new DMLRuntimeException("Federated AggregateBinary not supported with the "
+				+ "following federated objects: "+mo1.isFederated()+" "+mo2.isFederated());
 		}
-		// append MatrixObjects
-		LibFederatedAppend.federateAppend(matObject1, matObject2,
-			ec.getMatrixObject(output.getName()), _type.isCBind());
 	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/BinaryMatrixScalarFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/BinaryMatrixScalarFEDInstruction.java
index 3c9db11..0e05ca8 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/BinaryMatrixScalarFEDInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/BinaryMatrixScalarFEDInstruction.java
@@ -19,25 +19,12 @@
 
 package org.apache.sysds.runtime.instructions.fed;
 
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
-import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
-import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
-import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
 import org.apache.sysds.runtime.instructions.cp.CPOperand;
-import org.apache.sysds.runtime.instructions.cp.ScalarObject;
-import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.operators.Operator;
-import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Future;
 
 public class BinaryMatrixScalarFEDInstruction extends BinaryFEDInstruction
 {
@@ -48,47 +35,25 @@ public class BinaryMatrixScalarFEDInstruction extends BinaryFEDInstruction
 
 	@Override
 	public void processInstruction(ExecutionContext ec) {
-		MatrixObject matrix = ec.getMatrixObject(input1.isMatrix() ? input1 : input2);
-		ScalarObject scalar = ec.getScalarInput(input2.isScalar() ? input2 : input1);
-
-		ScalarOperator sc_op = (ScalarOperator) _optr;
-		sc_op = sc_op.setConstant(scalar.getDoubleValue());
-
-		if (!matrix.isFederated())
-			throw new DMLRuntimeException("Trying to execute federated operation on non federated matrix");
-
-		MatrixBlock ret = new MatrixBlock((int)matrix.getNumRows(), (int) matrix.getNumColumns(), false);
-		try {
-			//Keep track on federated execution ond matrix shards
-			List<Pair<FederatedRange, Future<FederatedResponse>>> idResponsePairs = new ArrayList<>();
-
-			//execute federated operation
-			for (Map.Entry<FederatedRange, FederatedData> entry : matrix.getFedMapping().entrySet()) {
-				FederatedData shard = entry.getValue();
-				if (!shard.isInitialized())
-					throw new DMLRuntimeException("Not all FederatedData was initialized for federated matrix");
-				Future<FederatedResponse> future = shard.executeFederatedOperation(
-					new FederatedRequest(FederatedRequest.FedMethod.SCALAR, sc_op), true);
-				idResponsePairs.add(new ImmutablePair<>(entry.getKey(), future));
-			}
-
-			for (Pair<FederatedRange, Future<FederatedResponse>> idResponsePair : idResponsePairs) {
-				FederatedRange range = idResponsePair.getLeft();
-				//wait for fed workers finishing their work
-				FederatedResponse federatedResponse = idResponsePair.getRight().get();
-
-				MatrixBlock shard = (MatrixBlock) federatedResponse.getData()[0];
-				ret.copy(range.getBeginDimsInt()[0], range.getEndDimsInt()[0]-1,
-					range.getBeginDimsInt()[1], range.getEndDimsInt()[1]-1, shard, false);
-			}
-		}
-		catch (Exception e) {
-			throw new DMLRuntimeException("Federated binary operation failed", e);
-		}
-
-		if(ret.getNumRows() != matrix.getNumRows() || ret.getNumColumns() != matrix.getNumColumns())
-			throw new DMLRuntimeException("Federated binary operation returns invalid matrix dimension");
+		CPOperand matrix = input1.isMatrix() ? input1 : input2;
+		CPOperand scalar = input2.isScalar() ? input2 : input1;
+		MatrixObject mo = ec.getMatrixObject(matrix);
+		
+		//execute federated matrix-scalar operation and cleanups
+		FederatedRequest fr1 = !scalar.isLiteral() ?
+			mo.getFedMapping().broadcast(ec.getScalarInput(scalar)) : null;
+		FederatedRequest fr2 = FederationUtils.callInstruction(instString, output,
+			new CPOperand[]{matrix, (fr1 != null)?scalar:null},
+			new long[]{mo.getFedMapping().getID(), (fr1 != null)?fr1.getID():-1});
+		
+		mo.getFedMapping().execute((fr1!=null) ?
+			new FederatedRequest[]{fr1, fr2}: new FederatedRequest[]{fr2});
+		if( fr1 != null )
+			mo.getFedMapping().cleanup(fr1.getID());
 		
-		ec.setMatrixOutput(output.getName(), ret);
+		//derive new fed mapping for output
+		MatrixObject out = ec.getMatrixObject(output);
+		out.getDataCharacteristics().set(mo.getDataCharacteristics());
+		out.setFedMapping(mo.getFedMapping().copyWithNewID(fr2.getID()));
 	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/InitFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/InitFEDInstruction.java
index e4b7dac..8d050b3 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/InitFEDInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/InitFEDInstruction.java
@@ -22,6 +22,7 @@ package org.apache.sysds.runtime.instructions.fed;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.sysds.common.Types;
+import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.conf.DMLConfig;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
@@ -30,6 +31,8 @@ import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
+import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
 import org.apache.sysds.runtime.instructions.InstructionUtils;
 import org.apache.sysds.runtime.instructions.cp.CPOperand;
 import org.apache.sysds.runtime.instructions.cp.Data;
@@ -103,7 +106,6 @@ public class InitFEDInstruction extends FEDInstruction {
 		for (int i = 0; i < addresses.getLength(); i++) {
 			Data addressData = addresses.getData().get(i);
 			if (addressData instanceof StringObject) {
-
 				// We split address into url/ip, the port and file path of file to read
 				String[] parsedValues = parseURL(((StringObject) addressData).getStringValue());
 				String host = parsedValues[0];
@@ -136,7 +138,6 @@ public class InitFEDInstruction extends FEDInstruction {
 				catch (UnknownHostException e) {
 					throw new DMLRuntimeException("federated host was unknown: " + host);
 				}
-
 			}
 			else {
 				throw new DMLRuntimeException("federated instruction only takes strings as addresses");
@@ -206,6 +207,7 @@ public class InitFEDInstruction extends FEDInstruction {
 			fedMapping.put(t.getLeft(), t.getRight());
 		}
 		List<Pair<FederatedData, Future<FederatedResponse>>> idResponses = new ArrayList<>();
+		long id = FederationUtils.getNextFedDataID();
 		for (Map.Entry<FederatedRange, FederatedData> entry : fedMapping.entrySet()) {
 			FederatedRange range = entry.getKey();
 			FederatedData value = entry.getValue();
@@ -217,7 +219,7 @@ public class InitFEDInstruction extends FEDInstruction {
 					dims[i] = endDims[i] - beginDims[i];
 				}
 				// TODO check if all matrices have the same DataType (currently only double is supported)
-				idResponses.add(new ImmutablePair<>(value, value.initFederatedData()));
+				idResponses.add(new ImmutablePair<>(value, value.initFederatedData(id)));
 			}
 		}
 		try {
@@ -230,7 +232,8 @@ public class InitFEDInstruction extends FEDInstruction {
 			throw new DMLRuntimeException("Federation initialization failed", e);
 		}
 		output.getDataCharacteristics().setNonZeros(output.getNumColumns() * output.getNumRows());
-		output.setFedMapping(fedMapping);
+		output.getDataCharacteristics().setBlocksize(ConfigurationManager.getBlocksize());
+		output.setFedMapping(new FederationMap(id, fedMapping));
 	}
 	
 	public void federateFrame(FrameObject output, List<Pair<FederatedRange, FederatedData>> workers) {
@@ -242,6 +245,7 @@ public class InitFEDInstruction extends FEDInstruction {
 		// on the distributed workers. We need the FederatedData, the starting column of the sub frame (for the schema)
 		// and the future for the response
 		List<Pair<FederatedData, Pair<Integer, Future<FederatedResponse>>>> idResponses = new ArrayList<>();
+		long id = FederationUtils.getNextFedDataID();
 		for (Map.Entry<FederatedRange, FederatedData> entry : fedMapping.entrySet()) {
 			FederatedRange range = entry.getKey();
 			FederatedData value = entry.getValue();
@@ -252,7 +256,7 @@ public class InitFEDInstruction extends FEDInstruction {
 				for (int i = 0; i < dims.length; i++) {
 					dims[i] = endDims[i] - beginDims[i];
 				}
-				idResponses.add(new ImmutablePair<>(value, new ImmutablePair<>((int) beginDims[1], value.initFederatedData())));
+				idResponses.add(new ImmutablePair<>(value, new ImmutablePair<>((int) beginDims[1], value.initFederatedData(id))));
 			}
 		}
 		// columns are definitely in int range, because we throw an DMLRuntime Exception in `processInstruction` else
@@ -271,7 +275,7 @@ public class InitFEDInstruction extends FEDInstruction {
 		}
 		output.getDataCharacteristics().setNonZeros(output.getNumColumns() * output.getNumRows());
 		output.setSchema(schema);
-		output.setFedMapping(fedMapping);
+		output.setFedMapping(new FederationMap(id, fedMapping));
 	}
 	
 	private static void handleFedFrameResponse(Types.ValueType[] schema, FederatedData federatedData,
diff --git a/src/main/java/org/apache/sysds/runtime/matrix/operators/AggregateUnaryOperator.java b/src/main/java/org/apache/sysds/runtime/matrix/operators/AggregateUnaryOperator.java
index 6d0ca53..a1faae0 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/operators/AggregateUnaryOperator.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/operators/AggregateUnaryOperator.java
@@ -26,6 +26,8 @@ import org.apache.sysds.runtime.functionobjects.KahanPlusSq;
 import org.apache.sysds.runtime.functionobjects.Minus;
 import org.apache.sysds.runtime.functionobjects.Or;
 import org.apache.sysds.runtime.functionobjects.Plus;
+import org.apache.sysds.runtime.functionobjects.ReduceCol;
+import org.apache.sysds.runtime.functionobjects.ReduceRow;
 
 
 public class AggregateUnaryOperator  extends Operator 
@@ -58,4 +60,12 @@ public class AggregateUnaryOperator  extends Operator
 	public int getNumThreads(){
 		return k;
 	}
+	
+	public boolean isRowAggregate() {
+		return indexFn instanceof ReduceCol;
+	}
+	
+	public boolean isColAggregate() {
+		return indexFn instanceof ReduceRow;
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java b/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
index ad0b6d7..77149d1 100644
--- a/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
@@ -24,19 +24,12 @@ import java.util.Arrays;
 import java.util.BitSet;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Future;
 
 import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.math3.random.RandomDataGenerator;
 import org.apache.sysds.common.Types.ValueType;
 import org.apache.sysds.runtime.DMLRuntimeException;
-import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
-import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
-import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
-import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
 import org.apache.sysds.runtime.data.SparseBlock;
 import org.apache.sysds.runtime.data.TensorIndexes;
 import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
@@ -801,23 +794,4 @@ public class UtilFunctions {
 				break;
 		}
 	}
-	
-	public static List<org.apache.commons.lang3.tuple.Pair<FederatedRange, Future<FederatedResponse>>> requestFederatedData(
-		Map<FederatedRange, FederatedData> fedMapping) {
-		List<org.apache.commons.lang3.tuple.Pair<FederatedRange, Future<FederatedResponse>>> readResponses = new ArrayList<>();
-		for(Map.Entry<FederatedRange, FederatedData> entry : fedMapping.entrySet()) {
-			FederatedRange range = entry.getKey();
-			FederatedData fd = entry.getValue();
-
-			if(fd.isInitialized()) {
-				FederatedRequest request = new FederatedRequest(FederatedRequest.FedMethod.TRANSFER);
-				Future<FederatedResponse> readResponse = fd.executeFederatedOperation(request, true);
-				readResponses.add(new ImmutablePair<>(range, readResponse));
-			}
-			else {
-				throw new DMLRuntimeException("Federated matrix read only supported on initialized FederatedData");
-			}
-		}
-		return readResponses;
-	}
 }
diff --git a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
index 8c1973b..bf62c34 100644
--- a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
+++ b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
@@ -1200,7 +1200,7 @@ public abstract class AutomatedTestBase {
 			if(exceptionExpected)
 				fail("expected exception which has not been raised: " + expectedException);
 		}
-		catch(Exception e) {
+		catch(Exception | Error e) {
 			if( !outputBuffering )
 				e.printStackTrace();
 			if(errMessage != null && !errMessage.equals("")) {
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedConstructionTest.java b/src/test/java/org/apache/sysds/test/functions/federated/FederatedConstructionTest.java
index 340234b..b4ce148 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedConstructionTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/FederatedConstructionTest.java
@@ -117,12 +117,10 @@ public class FederatedConstructionTest extends AutomatedTestBase {
 		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
 		Types.ExecMode platformOld = rtplatform;
 
-		Thread t;
-
 		String HOME = SCRIPT_DIR + TEST_DIR;
 
 		int port = getRandomAvailablePort();
-		t = startLocalFedWorker(port);
+		Thread t = startLocalFedWorker(port);
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
 		loadTestConfiguration(config);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedMultiplyTest.java b/src/test/java/org/apache/sysds/test/functions/federated/FederatedMultiplyTest.java
index 67a74be..7968dd7 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedMultiplyTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/FederatedMultiplyTest.java
@@ -77,8 +77,6 @@ public class FederatedMultiplyTest extends AutomatedTestBase {
 			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
 		}
 
-		Thread t1, t2;
-
 		getAndLoadTestConfiguration(TEST_NAME);
 		String HOME = SCRIPT_DIR + TEST_DIR;
 
@@ -98,8 +96,8 @@ public class FederatedMultiplyTest extends AutomatedTestBase {
 
 		int port1 = getRandomAvailablePort();
 		int port2 = getRandomAvailablePort();
-		t1 = startLocalFedWorker(port1);
-		t2 = startLocalFedWorker(port2);
+		Thread t1 = startLocalFedWorker(port1);
+		Thread t2 = startLocalFedWorker(port2);
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
 		loadTestConfiguration(config);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedRCBindTest.java b/src/test/java/org/apache/sysds/test/functions/federated/FederatedRCBindTest.java
index 93e1feb..81c0b00 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedRCBindTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/FederatedRCBindTest.java
@@ -48,7 +48,8 @@ public class FederatedRCBindTest extends AutomatedTestBase {
 
 	@Parameterized.Parameters
 	public static Collection<Object[]> data() {
-		return Arrays.asList(new Object[][] {{1, 1000}, {10, 100}, {100, 10}, {1000, 1}, {10, 2000}, {2000, 10}});
+		//TODO add tests and support of aligned blocksized (which is however a special case)
+		return Arrays.asList(new Object[][] {{1, 1001}, {10, 100}, {100, 10}, {1001, 1}, {10, 2001}, {2001, 10}});
 	}
 
 	@Override
@@ -71,8 +72,6 @@ public class FederatedRCBindTest extends AutomatedTestBase {
 		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
 		Types.ExecMode platformOld = rtplatform;
 
-		Thread t;
-
 		getAndLoadTestConfiguration(TEST_NAME);
 		String HOME = SCRIPT_DIR + TEST_DIR;
 
@@ -80,7 +79,7 @@ public class FederatedRCBindTest extends AutomatedTestBase {
 		writeInputMatrixWithMTD("A", A, false, new MatrixCharacteristics(rows, cols, blocksize, rows * cols));
 
 		int port = getRandomAvailablePort();
-		t = startLocalFedWorker(port);
+		Thread t = startLocalFedWorker(port);
 
 		// we need the reference file to not be written to hdfs, so we get the correct format
 		rtplatform = Types.ExecMode.SINGLE_NODE;
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedSumTest.java b/src/test/java/org/apache/sysds/test/functions/federated/FederatedSumTest.java
index 3118f01..69a743c 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedSumTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/FederatedSumTest.java
@@ -72,15 +72,13 @@ public class FederatedSumTest extends AutomatedTestBase {
 		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
 		Types.ExecMode platformOld = rtplatform;
 
-		Thread t;
-
 		getAndLoadTestConfiguration(TEST_NAME);
 		String HOME = SCRIPT_DIR + TEST_DIR;
 
 		double[][] A = getRandomMatrix(rows / 2, cols, -10, 10, 1, 1);
 		writeInputMatrixWithMTD("A", A, false, new MatrixCharacteristics(rows / 2, cols, blocksize, (rows / 2) * cols));
 		int port = getRandomAvailablePort();
-		t = startLocalFedWorker(port);
+		Thread t = startLocalFedWorker(port);
 
 		// we need the reference file to not be written to hdfs, so we get the correct format
 		rtplatform = Types.ExecMode.SINGLE_NODE;
diff --git a/src/test/scripts/functions/federated/FederatedSumTest.dml b/src/test/scripts/functions/federated/FederatedSumTest.dml
index 8a8efb3..37a19f6 100644
--- a/src/test/scripts/functions/federated/FederatedSumTest.dml
+++ b/src/test/scripts/functions/federated/FederatedSumTest.dml
@@ -23,6 +23,7 @@ A = federated(addresses=list($in, $in), ranges=list(list(0, 0), list($rows / 2,
 s = sum(A)
 r = rowSums(A)
 c = colSums(A)
+
 write(s, $out_S)
 write(r, $out_R)
 write(c, $out_C)
diff --git a/src/test/scripts/functions/federated/matrix_scalar/FederatedMatrixAdditionScalar.dml b/src/test/scripts/functions/federated/matrix_scalar/FederatedMatrixAdditionScalar.dml
index 9468448..6f1dac3 100644
--- a/src/test/scripts/functions/federated/matrix_scalar/FederatedMatrixAdditionScalar.dml
+++ b/src/test/scripts/functions/federated/matrix_scalar/FederatedMatrixAdditionScalar.dml
@@ -25,4 +25,5 @@
 M = federated(addresses=list($in), ranges=list(list(0, 0), list($rows, $cols)))
 S = $scalar
 R = M + S
+
 write(R, $out)