You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ba...@apache.org on 2022/01/20 18:53:22 UTC

[systemds] branch main updated: [SYSTEMDS-3206] Federated Execution w/ Misaligned Matrices

This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 97a8a4a  [SYSTEMDS-3206] Federated Execution w/ Misaligned Matrices
97a8a4a is described below

commit 97a8a4a91324bfa4085e6b33310a02215a26a6af
Author: ywcb00 <yw...@ywcb.org>
AuthorDate: Wed Sep 8 19:01:24 2021 +0200

    [SYSTEMDS-3206] Federated Execution w/ Misaligned Matrices
    
    This commit modify the Federated Execution of:
    
    - Aggregate Binary Fed
    - Append Fed
    - Binary Matrix Matrix Fed
    - Fed Instruction Utils
    - Fed QuarternaryWDivMMFed
    - Fed Spoof
    
    to allow for and test Misaligned matrix inputs, indicating federated
    sites that are not aligned across operations. Also contained is removal
    of some code across the federated instructions. In Detail:
    
    - remove unnecessary creation of extra thread in federated instruction
    - create federated output in the case of MV with not partial and not forced local output
    - move the local aggregations of the partial results to a joint method
    - bundle the whole local aggregation branches together with the local result aggregation
    - add a test for the MV case with verification for creation of federated output
    - tests to verify the creation of federated output using a sum operation
    - create federated output in the case of MV with not partial and not forced local output
    - move the local aggregations of the partial results to a joint method
    - include the FType.BROADCAST federated data inside the federated case of aquireReadIntern()
    	NOTE: otherwise, MatrixObject.readBlobFromHDFS()
    	-> CacheableData.acquireReadAndRelease() -> CacheableData.acquireRead()
    	-> CacheableData.acquireReadIntern() -> MatrixObject.readBlobFromHDFS()
    	is forming an endless loop
    - include the FType.BROADCAST federated data inside the federated case of aquireReadIntern()"
    - get the CacheableData from federated even if it's broadcast federated
      but the hdfs file and RDDhandle does not exist
    - Add support for two misaligned federated matrices
    - extend fed tests by element-wise plus and greater operation (EW_PLUS, EW_GREATER)
    - modify the federation map according to the individual needs for the MV case
      set the matrix characteristics according to the output dimensions
    
    Closes #1446
---
 .../controlprogram/caching/CacheableData.java      |   8 +-
 .../controlprogram/federated/FederationMap.java    |  19 ++
 .../fed/AggregateBinaryFEDInstruction.java         | 171 ++++++-----------
 .../instructions/fed/AppendFEDInstruction.java     |  76 +++++---
 .../fed/BinaryMatrixMatrixFEDInstruction.java      |  32 ++--
 .../instructions/fed/FEDInstructionUtils.java      |  36 +---
 .../runtime/instructions/fed/MMFEDInstruction.java | 189 +++++++-----------
 .../fed/QuaternaryWDivMMFEDInstruction.java        |  22 +--
 .../instructions/fed/SpoofFEDInstruction.java      |  26 +--
 ...egateTest.java => FederatedMisAlignedTest.java} | 212 +++++++++++----------
 .../federated/primitives/FederatedRCBindTest.java  |  13 +-
 .../primitives/FederatedRowAggregateTest.java      |  17 +-
 .../primitives/FederatedTokenizeTest.java          |   6 +-
 .../federated/FederatedMisAlignedTest.dml          |  88 +++++++++
 .../federated/FederatedMisAlignedTestReference.dml |  74 +++++++
 .../functions/federated/FederatedRCBindTest.dml    |   6 +-
 .../federated/FederatedRCBindTestReference.dml     |   5 +
 .../federated/aggregate/FederatedMMTest.dml        |  11 +-
 .../aggregate/FederatedMMTestReference.dml         |   7 +-
 19 files changed, 551 insertions(+), 467 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
index 773c92c..f9438be 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
@@ -355,6 +355,10 @@ public abstract class CacheableData<T extends CacheBlock> extends Data
 		return _metaData.getDataCharacteristics();
 	}
 
+	public long getDim(int dim) {
+		return getDataCharacteristics().getDim(dim);
+	}
+
 	public long getNumRows() {
 		return getDataCharacteristics().getRows();
 	}
@@ -547,7 +551,9 @@ public abstract class CacheableData<T extends CacheBlock> extends Data
 					if( DMLScript.STATISTICS )
 						CacheStatistics.incrementLinHits();
 				}
-				else if( isFederatedExcept(FType.BROADCAST) ) {
+				else if( isFederatedExcept(FType.BROADCAST)
+					|| (isFederated(FType.BROADCAST) && !HDFSTool.existsFileOnHDFS(_hdfsFileName)
+						&& getRDDHandle() == null) ) {
 					_data = readBlobFromFederated(_fedMapping);
 
 					//mark for initial local write despite read operation
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
index 680f608..7e3c101 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
@@ -28,6 +28,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.function.BiFunction;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import org.apache.commons.lang3.tuple.Pair;
@@ -583,6 +584,24 @@ public class FederationMap {
 		return this;
 	}
 
+	/**
+	 * Take the federated mapping and sets one dimension of all federated ranges
+	 * to the specified value.
+	 *
+	 * @param value      long value for setting the dimension
+	 * @param dim        indicates if the row (0) or column (1) dimension should be set to value
+	 * @return FederationMap with the modified federated ranges
+	 */
+	public FederationMap modifyFedRanges(long value, int dim) {
+		if(getType() == (dim == 0 ? FType.ROW : FType.COL))
+			throw new DMLRuntimeException("Federated ranges cannot be modified in the direction of its partitioning.");
+		IntStream.range(0, getFederatedRanges().length).forEach(i -> {
+			getFederatedRanges()[i].setBeginDim(dim, 0);
+			getFederatedRanges()[i].setEndDim(dim, value);
+		});
+		return this;
+	}
+
 	public FederationMap transpose() {
 		List<Pair<FederatedRange, FederatedData>> tmp = new ArrayList<>(_fedMap);
 		_fedMap.clear();
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateBinaryFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateBinaryFEDInstruction.java
index e78d4cd..48b31a1 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateBinaryFEDInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateBinaryFEDInstruction.java
@@ -21,10 +21,10 @@ package org.apache.sysds.runtime.instructions.fed;
 
 import java.util.concurrent.Future;
 
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
-import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.RequestType;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
@@ -84,12 +84,7 @@ public class AggregateBinaryFEDInstruction extends BinaryFEDInstruction {
 				setPartialOutput(mo1.getFedMapping(), mo1, mo2, fr1.getID(), ec);
 			}
 			else {
-				FederatedRequest fr2 = new FederatedRequest(RequestType.GET_VAR, fr1.getID());
-				FederatedRequest fr3 = mo2.getFedMapping().cleanup(getTID(), fr1.getID(), fr2.getID());
-				//execute federated operations and aggregate
-				Future<FederatedResponse>[] tmp = mo1.getFedMapping().execute(getTID(), fr1, fr2, fr3);
-				MatrixBlock ret = FederationUtils.aggAdd(tmp);
-				ec.setMatrixOutput(output.getName(), ret);
+				aggregateLocally(mo1.getFedMapping(), true, ec, fr1);
 			}
 		}
 		else if(mo1.isFederated(FType.ROW) || mo1.isFederated(FType.PART)) { // MV + MM
@@ -98,88 +93,37 @@ public class AggregateBinaryFEDInstruction extends BinaryFEDInstruction {
 			FederatedRequest fr2 = FederationUtils.callInstruction(instString, output,
 				new CPOperand[]{input1, input2},
 				new long[]{mo1.getFedMapping().getID(), fr1.getID()}, true);
-			if( mo2.getNumColumns() == 1 ) { //MV
-				if ( _fedOut.isForcedFederated() ){
-					mo1.getFedMapping().execute(getTID(), fr1, fr2);
-					if ( mo1.isFederated(FType.PART) )
-						setPartialOutput(mo1.getFedMapping(), mo1, mo2, fr2.getID(), ec);
-					else
-						setOutputFedMapping(mo1.getFedMapping(), mo1, mo2, fr2.getID(), ec);
-				}
-				else {
-					FederatedRequest fr3 = new FederatedRequest(RequestType.GET_VAR, fr2.getID());
-					FederatedRequest fr4 = mo1.getFedMapping().cleanup(getTID(), fr2.getID());
-					//execute federated operations and aggregate
-					Future<FederatedResponse>[] tmp = mo1.getFedMapping().execute(getTID(), fr1, fr2, fr3, fr4);
-					MatrixBlock ret;
-					if ( mo1.isFederated(FType.PART) )
-						ret = FederationUtils.aggAdd(tmp);
-					else
-						ret = FederationUtils.bind(tmp, false);
-					ec.setMatrixOutput(output.getName(), ret);
-				}
+
+			boolean isVector = mo2.getNumColumns() == 1;
+			boolean isPartOut = mo1.isFederated(FType.PART) || // MV and MM
+				(!isVector && mo2.isFederated(FType.PART)); // only MM
+			if(isPartOut && _fedOut.isForcedFederated()) {
+				mo1.getFedMapping().execute(getTID(), true, fr1, fr2);
+				setPartialOutput(mo1.getFedMapping(), mo1, mo2, fr2.getID(), ec);
 			}
-			else { //MM
-				//execute federated operations and aggregate
-				if ( !_fedOut.isForcedLocal() ){
-					mo1.getFedMapping().execute(getTID(), true, fr1, fr2);
-					if ( mo1.isFederated(FType.PART) || mo2.isFederated(FType.PART) )
-						setPartialOutput(mo1.getFedMapping(), mo1, mo2, fr2.getID(), ec);
-					else
-						setOutputFedMapping(mo1.getFedMapping(), mo1, mo2, fr2.getID(), ec);
-				}
-				else {
-					FederatedRequest fr3 = new FederatedRequest(RequestType.GET_VAR, fr2.getID());
-					FederatedRequest fr4 = mo1.getFedMapping().cleanup(getTID(), fr2.getID());
-					//execute federated operations and aggregate
-					Future<FederatedResponse>[] tmp = mo1.getFedMapping().execute(getTID(), fr1, fr2, fr3, fr4);
-					MatrixBlock ret;
-					if ( mo1.isFederated(FType.PART) )
-						ret = FederationUtils.aggAdd(tmp);
-					else
-						ret = FederationUtils.bind(tmp, false);
-					ec.setMatrixOutput(output.getName(), ret);
-				}
+			else if((_fedOut.isForcedFederated() || (!isVector && !_fedOut.isForcedLocal()))
+				&& !isPartOut) { // not creating federated output in the MV case for reasons of performance
+				mo1.getFedMapping().execute(getTID(), true, fr1, fr2);
+				setOutputFedMapping(mo1.getFedMapping(), mo1, mo2, fr2.getID(), ec);
+			}
+			else {
+				aggregateLocally(mo1.getFedMapping(), mo1.isFederated(FType.PART), ec, fr1, fr2);
 			}
 		}
 		//#2 vector - federated matrix multiplication
 		else if (mo2.isFederated(FType.ROW)) {// VM + MM
-			if ( mo1.isFederated(FType.COL) && isAggBinaryFedAligned(mo1,mo2) ){
-				FederatedRequest fr2 = FederationUtils.callInstruction(instString, output,
-					new CPOperand[]{input1, input2},
-					new long[]{mo1.getFedMapping().getID(), mo2.getFedMapping().getID()}, true);
-				if ( _fedOut.isForcedFederated() ){
-					// Partial aggregates (set fedmapping to the partial aggs)
-					mo2.getFedMapping().execute(getTID(), true, fr2);
-					setPartialOutput(mo2.getFedMapping(), mo1, mo2, fr2.getID(), ec);
-				}
-				else {
-					FederatedRequest fr3 = new FederatedRequest(RequestType.GET_VAR, fr2.getID());
-					//execute federated operations and aggregate
-					Future<FederatedResponse>[] tmp = mo2.getFedMapping().execute(getTID(), fr2, fr3);
-					MatrixBlock ret = FederationUtils.aggAdd(tmp);
-					ec.setMatrixOutput(output.getName(), ret);
-				}
+			//construct commands: broadcast rhs, fed mv, retrieve results
+			FederatedRequest[] fr1 = mo2.getFedMapping().broadcastSliced(mo1, true);
+			FederatedRequest fr2 = FederationUtils.callInstruction(instString, output,
+				new CPOperand[]{input1, input2},
+				new long[]{fr1[0].getID(), mo2.getFedMapping().getID()}, true);
+			if ( _fedOut.isForcedFederated() ){
+				// Partial aggregates (set fedmapping to the partial aggs)
+				mo2.getFedMapping().execute(getTID(), true, fr1, fr2);
+				setPartialOutput(mo2.getFedMapping(), mo1, mo2, fr2.getID(), ec);
 			}
 			else {
-				//construct commands: broadcast rhs, fed mv, retrieve results
-				FederatedRequest[] fr1 = mo2.getFedMapping().broadcastSliced(mo1, true);
-				FederatedRequest fr2 = FederationUtils.callInstruction(instString, output,
-					new CPOperand[]{input1, input2},
-					new long[]{fr1[0].getID(), mo2.getFedMapping().getID()}, true);
-				if ( _fedOut.isForcedFederated() ){
-					// Partial aggregates (set fedmapping to the partial aggs)
-					mo2.getFedMapping().execute(getTID(), true, fr1, fr2);
-					setPartialOutput(mo2.getFedMapping(), mo1, mo2, fr2.getID(), ec);
-				}
-				else {
-					FederatedRequest fr3 = new FederatedRequest(RequestType.GET_VAR, fr2.getID());
-					FederatedRequest fr4 = mo2.getFedMapping().cleanup(getTID(), fr2.getID());
-					//execute federated operations and aggregate
-					Future<FederatedResponse>[] tmp = mo2.getFedMapping().execute(getTID(), true, fr1, fr2, fr3, fr4);
-					MatrixBlock ret = FederationUtils.aggAdd(tmp);
-					ec.setMatrixOutput(output.getName(), ret);
-				}
+				aggregateLocally(mo2.getFedMapping(), true, ec, fr1, fr2);
 			}
 		}
 		//#3 col-federated matrix vector multiplication
@@ -195,12 +139,7 @@ public class AggregateBinaryFEDInstruction extends BinaryFEDInstruction {
 				setPartialOutput(mo1.getFedMapping(), mo1, mo2, fr2.getID(), ec);
 			}
 			else {
-				FederatedRequest fr3 = new FederatedRequest(RequestType.GET_VAR, fr2.getID());
-				FederatedRequest fr4 = mo1.getFedMapping().cleanup(getTID(), fr2.getID());
-				//execute federated operations and aggregate
-				Future<FederatedResponse>[] tmp = mo1.getFedMapping().execute(getTID(), fr1, fr2, fr3, fr4);
-				MatrixBlock ret = FederationUtils.aggAdd(tmp);
-				ec.setMatrixOutput(output.getName(), ret);
+				aggregateLocally(mo1.getFedMapping(), true, ec, fr1, fr2);
 			}
 		}
 		else { //other combinations
@@ -211,28 +150,6 @@ public class AggregateBinaryFEDInstruction extends BinaryFEDInstruction {
 	}
 
 	/**
-	 * Checks alignment of dimensions for the federated aggregate binary processing without broadcast.
-	 * If the begin and end ranges of mo1 has cols equal to the rows of the begin and end ranges of mo2,
-	 * the two inputs are aligned for the processing of the federated aggregate binary instruction without broadcasting.
-	 * @param mo1 input matrix object 1
-	 * @param mo2 input matrix object 2
-	 * @return true if the two inputs are aligned for aggregate binary processing without broadcasting
-	 */
-	private static boolean isAggBinaryFedAligned(MatrixObject mo1, MatrixObject mo2){
-		FederatedRange[] mo1FederatedRanges = mo1.getFedMapping().getFederatedRanges();
-		FederatedRange[] mo2FederatedRanges = mo2.getFedMapping().getFederatedRanges();
-		for ( int i = 0; i < mo1FederatedRanges.length; i++ ){
-			FederatedRange mo1FedRange = mo1FederatedRanges[i];
-			FederatedRange mo2FedRange = mo2FederatedRanges[i];
-
-			if ( mo1FedRange.getBeginDims()[1] != mo2FedRange.getBeginDims()[0]
-				|| mo1FedRange.getEndDims()[1] != mo2FedRange.getEndDims()[0])
-				return false;
-		}
-		return true;
-	}
-
-	/**
 	 * Sets the output with a federated mapping of overlapping partial aggregates.
 	 * @param federationMap federated map from which the federated metadata is retrieved
 	 * @param mo1 matrix object with number of rows used to set the number of rows of the output
@@ -263,4 +180,38 @@ public class AggregateBinaryFEDInstruction extends BinaryFEDInstruction {
 		out.getDataCharacteristics().set(mo1.getNumRows(), mo2.getNumColumns(), (int)mo1.getBlocksize());
 		out.setFedMapping(federationMap.copyWithNewID(outputID, mo2.getNumColumns()));
 	}
+
+	private void aggregateLocally(FederationMap fedMap, boolean aggAdd, ExecutionContext ec,
+		FederatedRequest... fr) {
+		aggregateLocally(fedMap, aggAdd, ec, null, fr);
+	}
+
+	/**
+	 * Get the partial results and aggregate the partial results locally
+	 * @param fedMap the federated mapping
+	 * @param aggAdd indicates whether to aggregate the results by addition or binding
+	 * @param ec execution context
+	 * @param frSliced the federated request array from a sliced broadcast
+	 * @param fr the previous federated requests
+	 * NOTE: the last federated request fr has to be the instruction call
+	 */
+	private void aggregateLocally(FederationMap fedMap, boolean aggAdd, ExecutionContext ec,
+		FederatedRequest[] frSliced, FederatedRequest... fr) {
+		long callInstID = fr[fr.length - 1].getID();
+		FederatedRequest frG = new FederatedRequest(RequestType.GET_VAR, callInstID);
+		FederatedRequest frC = fedMap.cleanup(getTID(), callInstID);
+		//execute federated operations and aggregate
+		Future<FederatedResponse>[] ffr;
+		if(frSliced != null)
+			ffr = fedMap.execute(getTID(), frSliced, ArrayUtils.addAll(fr, frG, frC));
+		else
+			ffr = fedMap.execute(getTID(), ArrayUtils.addAll(fr, frG, frC));
+
+		MatrixBlock ret;
+		if ( aggAdd )
+			ret = FederationUtils.aggAdd(ffr);
+		else
+			ret = FederationUtils.bind(ffr, false);
+		ec.setMatrixOutput(output.getName(), ret);
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/AppendFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/AppendFEDInstruction.java
index 15ba1f1..56671a1 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/AppendFEDInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/AppendFEDInstruction.java
@@ -23,7 +23,9 @@ import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.RequestType;
 import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
+import org.apache.sysds.runtime.controlprogram.federated.FederationMap.AlignType;
 import org.apache.sysds.runtime.controlprogram.federated.FederationMap.FType;
 import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
 import org.apache.sysds.runtime.functionobjects.OffsetColumnIndex;
@@ -88,52 +90,72 @@ public class AppendFEDInstruction extends BinaryFEDInstruction {
 		MatrixObject out = ec.getMatrixObject(output);
 		MetaDataUtils.updateAppendDataCharacteristics(dc1, dc2, out.getDataCharacteristics(), _cbind);
 		
-		// federated/federated
-		if( mo1.isFederated() && mo2.isFederated() 
-			&& mo1.getFedMapping().getType()==mo2.getFedMapping().getType()
-			&& !mo1.getFedMapping().isAligned(mo2.getFedMapping(), FederationMap.AlignType.valueOf(mo1.getFedMapping().getType().name()))
-		)
-		{
+		// federated/federated aligned
+		if( ((mo1.isFederated(FType.ROW) && mo2.isFederated(FType.ROW) && _cbind)
+				|| (mo1.isFederated(FType.COL) && mo2.isFederated(FType.COL) && !_cbind))
+			&& mo1.getFedMapping().isAligned(mo2.getFedMapping(), mo1.isFederated(FType.ROW) ? AlignType.ROW : AlignType.COL)) {
+			boolean isSpark = instString.contains("SPARK");
+
+			FederatedRequest fr2 = FederationUtils.callInstruction(instString, output,
+				new CPOperand[]{input1, input2},
+				new long[]{mo1.getFedMapping().getID(), mo2.getFedMapping().getID()});
+
+			if(isSpark) {
+				FederatedRequest frTmp = new FederatedRequest(RequestType.PUT_VAR,
+					fr2.getID(), new MatrixCharacteristics(-1, -1), mo1.getDataType());
+				mo1.getFedMapping().execute(getTID(), true, frTmp, fr2);
+			}
+			else {
+				mo1.getFedMapping().execute(getTID(), true, fr2);
+			}
+
+			int dim = (_cbind ? 1 : 0);
+			FederationMap newFedMap = mo1.getFedMapping().copyWithNewID(fr2.getID())
+				.modifyFedRanges(mo1.getDim(dim) + mo2.getDim(dim), dim);
+			out.setFedMapping(newFedMap);
+		}
+		// federated/federated misaligned, federated/local, local/federated bind
+		else if( ((mo1.isFederated(FType.ROW) || mo2.isFederated(FType.ROW)) && !_cbind)
+			|| ((mo1.isFederated(FType.COL) || mo2.isFederated(FType.COL)) && _cbind) ) {
 			long id = FederationUtils.getNextFedDataID();
 			long roff = _cbind ? 0 : dc1.getRows();
 			long coff = _cbind ? dc1.getCols() : 0;
 
-			out.setFedMapping(mo1.getFedMapping().identCopy(getTID(), id).bind(roff, coff, mo2.getFedMapping().identCopy(getTID(), id)));
+			boolean isFed1 = mo1.isFederated(_cbind ? FType.COL : FType.ROW);
+			boolean isFed2 = mo2.isFederated(_cbind ? FType.COL : FType.ROW);
+			FederationMap fed1 = isFed1 ? mo1.getFedMapping() : FederationUtils.federateLocalData(mo1);
+			FederationMap fed2 = isFed2 ? mo2.getFedMapping() : FederationUtils.federateLocalData(mo2);
+
+			out.setFedMapping(fed1.identCopy(getTID(), id)
+				.bind(roff, coff, fed2.identCopy(getTID(), id)));
 		}
-		// federated/local, local/federated cbind
-		else if( (mo1.isFederated(FType.ROW) || mo2.isFederated(FType.ROW)) && _cbind ) {
-			boolean isFed = mo1.isFederated(FType.ROW) && mo1.isFederatedExcept(FType.BROADCAST);
+		// federated/local, local/federated bind
+		else if( ((mo1.isFederated(FType.ROW) || mo2.isFederated(FType.ROW)) && _cbind)
+			|| ((mo1.isFederated(FType.COL) || mo2.isFederated(FType.COL)) && !_cbind) ) {
+			boolean isFed1 = mo1.isFederated(_cbind ? FType.ROW : FType.COL);
 			boolean isSpark = instString.contains("SPARK");
-			MatrixObject moFed = isFed ? mo1 : mo2;
-			MatrixObject moLoc = isFed ? mo2 : mo1;
+			MatrixObject moFed = isFed1 ? mo1 : mo2;
+			MatrixObject moLoc = isFed1 ? mo2 : mo1;
 			
 			//construct commands: broadcast lhs, fed append, clean broadcast
 			FederatedRequest[] fr1 = moFed.getFedMapping().broadcastSliced(moLoc, false);
 			FederatedRequest fr2 = FederationUtils.callInstruction(instString, output,
-				new CPOperand[]{input1, input2}, isFed ?
+				new CPOperand[]{input1, input2}, isFed1 ?
 				new long[]{ moFed.getFedMapping().getID(), fr1[0].getID()} :
 				new long[]{ fr1[0].getID(), moFed.getFedMapping().getID()});
 			
 			//execute federated operations and set output
 			if(isSpark) {
-				FederatedRequest tmp = new FederatedRequest(FederatedRequest.RequestType.PUT_VAR, fr2.getID(), new MatrixCharacteristics(-1, -1), mo1.getDataType());
+				FederatedRequest tmp = new FederatedRequest(RequestType.PUT_VAR,
+					fr2.getID(), new MatrixCharacteristics(-1, -1), mo1.getDataType());
 				moFed.getFedMapping().execute(getTID(), true, fr1, tmp, fr2);
 			} else {
 				moFed.getFedMapping().execute(getTID(), true, fr1, fr2);
 			}
-			out.setFedMapping(moFed.getFedMapping().copyWithNewID(fr2.getID(), out.getNumColumns()));
-		}
-		// federated/local, local/federated rbind
-		else if( (mo1.isFederated(FType.ROW) || mo2.isFederated(FType.ROW)) && !_cbind) {
-			long id = FederationUtils.getNextFedDataID();
-			long roff = _cbind ? 0 : dc1.getRows();
-			long coff = _cbind ? dc1.getCols() : 0;
-			FederationMap fed1 = mo1.isFederated(FType.ROW) ?
-				mo1.getFedMapping() : FederationUtils.federateLocalData(mo1);
-			FederationMap fed2 = mo2.isFederated(FType.ROW) ?
-				mo2.getFedMapping() : FederationUtils.federateLocalData(mo2);
-			out.setFedMapping(fed1.identCopy(getTID(), id)
-				.bind(roff, coff, fed2.identCopy(getTID(), id)));
+			int dim = (_cbind ? 1 : 0);
+			FederationMap newFedMap = moFed.getFedMapping().copyWithNewID(fr2.getID())
+				.modifyFedRanges(moFed.getDim(dim) + moLoc.getDim(dim), dim);
+			out.setFedMapping(newFedMap);
 		}
 		else {
 			throw new DMLRuntimeException("Unsupported federated append: "
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/BinaryMatrixMatrixFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/BinaryMatrixMatrixFEDInstruction.java
index 58a890a..813379a 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/BinaryMatrixMatrixFEDInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/BinaryMatrixMatrixFEDInstruction.java
@@ -51,6 +51,8 @@ public class BinaryMatrixMatrixFEDInstruction extends BinaryFEDInstruction
 			mo2 = ec.getMatrixObject(input1);
 		}
 
+		MatrixObject fedMo; // store the matrix object where the fed requests are executed
+
 		//execute federated operation on mo1 or mo2
 		FederatedRequest fr2 = null;
 		if( mo2.isFederatedExcept(FType.BROADCAST) ) {
@@ -59,19 +61,16 @@ public class BinaryMatrixMatrixFEDInstruction extends BinaryFEDInstruction
 				fr2 = FederationUtils.callInstruction(instString, output,
 					new CPOperand[]{input1, input2},
 					new long[]{mo1.getFedMapping().getID(), mo2.getFedMapping().getID()}, true);
-				mo1.getFedMapping().execute(getTID(), true, fr2);
+				mo2.getFedMapping().execute(getTID(), true, fr2);
 			}
-			else if ( !mo1.isFederated() ) {
+			else {
 				FederatedRequest[] fr1 = mo2.getFedMapping().broadcastSliced(mo1, false);
 				fr2 = FederationUtils.callInstruction(instString, output,
 					new CPOperand[]{input1, input2},
 					new long[]{fr1[0].getID(), mo2.getFedMapping().getID()}, true);
 				mo2.getFedMapping().execute(getTID(), true, fr1, fr2);
 			}
-			else {
-				throw new DMLRuntimeException("Matrix-matrix binary operations with a "
-					+ "federated right input are only supported for special cases yet.");
-			}
+			fedMo = mo2; // for setting the output federated mapping afterwards
 		}
 		else { // matrix-matrix binary operations -> lhs fed input -> fed output
 			if(mo1.isFederated(FType.FULL) ) {
@@ -112,14 +111,14 @@ public class BinaryMatrixMatrixFEDInstruction extends BinaryFEDInstruction
 			else {
 				throw new DMLRuntimeException("Matrix-matrix binary operations are only supported with a row partitioned or column partitioned federated input yet.");
 			}
+			fedMo = mo1; // for setting the output federated mapping afterwards
 		}
 
 		if ( mo1.isFederated(FType.PART) && !mo2.isFederated() )
 			setOutputFedMappingPart(mo1, mo2, fr2.getID(), ec);
-		else if ( mo1.isFederated() )
-			setOutputFedMapping(mo1, fr2.getID(), ec);
-		else if ( mo2.isFederated() )
-			setOutputFedMapping(mo2, fr2.getID(), ec);
+		else if ( fedMo.isFederated() )
+			setOutputFedMapping(fedMo, Math.max(mo1.getNumRows(), mo2.getNumRows()),
+				Math.max(mo1.getNumColumns(), mo2.getNumColumns()), fr2.getID(), ec);
 		else throw new DMLRuntimeException("Input is not federated, so the output FedMapping cannot be set!");
 	}
 
@@ -144,9 +143,16 @@ public class BinaryMatrixMatrixFEDInstruction extends BinaryFEDInstruction
 	 * @param outputFedmappingID ID for the fed mapping of output
 	 * @param ec execution context
 	 */
-	private void setOutputFedMapping(MatrixObject moFederated, long outputFedmappingID, ExecutionContext ec){
+	private void setOutputFedMapping(MatrixObject moFederated, long rowNum, long colNum,
+		long outputFedmappingID, ExecutionContext ec){
 		MatrixObject out = ec.getMatrixObject(output);
-		out.getDataCharacteristics().set(moFederated.getDataCharacteristics());
-		out.setFedMapping(moFederated.getFedMapping().copyWithNewID(outputFedmappingID));
+		FederationMap fedMap = moFederated.getFedMapping().copyWithNewID(outputFedmappingID);
+		if(moFederated.getNumRows() != rowNum || moFederated.getNumColumns() != colNum) {
+			int dim = moFederated.isFederated(FType.COL) ? 0 : 1;
+			fedMap.modifyFedRanges((dim == 0) ? rowNum : colNum, dim);
+		}
+		out.getDataCharacteristics().set(moFederated.getDataCharacteristics())
+			.setRows(rowNum).setCols(colNum);
+		out.setFedMapping(fedMap);
 	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
index d46d493..64f1f9a 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
@@ -392,42 +392,16 @@ public class FEDInstructionUtils {
 				if(data instanceof MatrixObject && ((MatrixObject) data).isFederatedExcept(FType.BROADCAST))
 					fedinst = QuantilePickFEDInstruction.parseInstruction(inst.getInstructionString());
 			}
-			else if (inst instanceof AppendGAlignedSPInstruction) {
-				AppendGAlignedSPInstruction ainstruction = (AppendGAlignedSPInstruction) inst;
+			else if (inst instanceof AppendGAlignedSPInstruction || inst instanceof AppendGSPInstruction
+				|| inst instanceof AppendMSPInstruction || inst instanceof AppendRSPInstruction) {
+				BinarySPInstruction ainstruction = (BinarySPInstruction) inst;
 				Data data1 = ec.getVariable(ainstruction.input1);
 				Data data2 = ec.getVariable(ainstruction.input2);
-				if (data1 instanceof MatrixObject && ((MatrixObject) data1).isFederatedExcept(FType.BROADCAST)
-					&& (! ((CacheableData<?>)data2).isFederated() || ((CacheableData<?>)data2).isFederatedExcept(FType.BROADCAST))) {
+				if ((data1 instanceof MatrixObject && ((MatrixObject) data1).isFederatedExcept(FType.BROADCAST))
+					|| (data2 instanceof MatrixObject && ((MatrixObject) data2).isFederatedExcept(FType.BROADCAST))) {
 					fedinst = AppendFEDInstruction.parseInstruction(instruction.getInstructionString());
 				}
 			}
-			else if (inst instanceof AppendGSPInstruction) {
-				AppendGSPInstruction ainstruction = (AppendGSPInstruction) inst;
-				Data data1 = ec.getVariable(ainstruction.input1);
-				Data data2 = ec.getVariable(ainstruction.input2);
-				if(data1 instanceof MatrixObject && ((MatrixObject) data1).isFederatedExcept(FType.BROADCAST)
-					&& (! ((CacheableData<?>)data2).isFederated() || ((CacheableData<?>)data2).isFederatedExcept(FType.BROADCAST))) {
-					fedinst = AppendFEDInstruction.parseInstruction(ainstruction.getInstructionString());
-				}
-			}
-			else  if (inst instanceof AppendMSPInstruction) {
-				AppendMSPInstruction ainstruction = (AppendMSPInstruction) inst;
-				Data data1 = ec.getVariable(ainstruction.input1);
-				Data data2 = ec.getVariable(ainstruction.input2);
-				if(((CacheableData<?>) data1).isFederatedExcept(FType.BROADCAST) && (! ((CacheableData<?>)data2).isFederated()
-					|| ((CacheableData<?>)data2).isFederatedExcept(FType.BROADCAST))) {
-					fedinst = AppendFEDInstruction.parseInstruction(ainstruction.getInstructionString());
-				}
-			}
-			else  if (inst instanceof AppendRSPInstruction) {
-				AppendRSPInstruction ainstruction = (AppendRSPInstruction) inst;
-				Data data1 = ec.getVariable(ainstruction.input1);
-				Data data2 = ec.getVariable(ainstruction.input2);
-				if(((CacheableData<?>) data1).isFederatedExcept(FType.BROADCAST) && (! ((CacheableData<?>)data2).isFederated()
-					|| ((CacheableData<?>)data2).isFederatedExcept(FType.BROADCAST))) {
-					fedinst = AppendFEDInstruction.parseInstruction(ainstruction.getInstructionString());
-				}
-			}
 			else if (inst instanceof BinaryMatrixScalarSPInstruction
 				|| inst instanceof BinaryMatrixMatrixSPInstruction
 				|| inst instanceof BinaryMatrixBVectorSPInstruction
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/MMFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/MMFEDInstruction.java
index d680f51..865696b 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/MMFEDInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/MMFEDInstruction.java
@@ -22,15 +22,16 @@ package org.apache.sysds.runtime.instructions.fed;
 import java.util.concurrent.Future;
 
 import org.apache.commons.lang3.ArrayUtils;
-import org.apache.sysds.common.Types;
+import org.apache.sysds.common.Types.DataType;
+import org.apache.sysds.common.Types.ExecType;
 import org.apache.sysds.hops.AggBinaryOp;
 import org.apache.sysds.lops.MapMult;
 import org.apache.sysds.lops.PMMJ;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
-import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.RequestType;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
 import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
 import org.apache.sysds.runtime.controlprogram.federated.FederationMap.AlignType;
@@ -73,8 +74,8 @@ public class MMFEDInstruction extends BinaryFEDInstruction
 		MatrixObject mo2 = ec.getMatrixObject(input2);
 
 		long id = FederationUtils.getNextFedDataID();
-		FederatedRequest frEmpty = new FederatedRequest(FederatedRequest.RequestType.PUT_VAR, id,
-			new MatrixCharacteristics(-1, -1), Types.DataType.MATRIX);
+		FederatedRequest frEmpty = new FederatedRequest(FederatedRequest.RequestType.PUT_VAR,
+			id, new MatrixCharacteristics(-1, -1), DataType.MATRIX);
 
 		//TODO cleanup unnecessary redundancy
 		//#1 federated matrix-vector multiplication
@@ -82,19 +83,14 @@ public class MMFEDInstruction extends BinaryFEDInstruction
 			&& mo1.getFedMapping().isAligned(mo2.getFedMapping(), AlignType.COL_T) ) {
 			FederatedRequest fr1 = FederationUtils.callInstruction(instString, output, id,
 				new CPOperand[]{input1, input2},
-				new long[]{mo1.getFedMapping().getID(), mo2.getFedMapping().getID()}, Types.ExecType.SPARK,false);
+				new long[]{mo1.getFedMapping().getID(), mo2.getFedMapping().getID()}, ExecType.SPARK, false);
 
 			if ( _fedOut.isForcedFederated() ){
 				mo1.getFedMapping().execute(getTID(), frEmpty, fr1);
 				setPartialOutput(mo1.getFedMapping(), mo1, mo2, fr1.getID(), ec);
 			}
 			else {
-				FederatedRequest fr2 = new FederatedRequest(FederatedRequest.RequestType.GET_VAR, fr1.getID());
-				FederatedRequest fr3 = mo2.getFedMapping().cleanup(getTID(), fr1.getID(), fr2.getID());
-				//execute federated operations and aggregate
-				Future<FederatedResponse>[] tmp = mo1.getFedMapping().execute(getTID(), frEmpty, fr1, fr2, fr3);
-				MatrixBlock ret = FederationUtils.aggAdd(tmp);
-				ec.setMatrixOutput(output.getName(), ret);
+				aggregateLocally(mo1.getFedMapping(), true, ec, frEmpty, fr1);
 			}
 		}
 		else if(mo1.isFederated(FType.ROW) || mo1.isFederated(FType.PART)) { // MV + MM
@@ -102,110 +98,54 @@ public class MMFEDInstruction extends BinaryFEDInstruction
 			FederatedRequest fr1 = mo1.getFedMapping().broadcast(mo2);
 			FederatedRequest fr2 = FederationUtils.callInstruction(instString, output, id,
 				new CPOperand[]{input1, input2},
-				new long[]{mo1.getFedMapping().getID(), fr1.getID()}, Types.ExecType.SPARK, false);
-			if( mo2.getNumColumns() == 1 ) { //MV
-				if ( _fedOut.isForcedFederated() ){
-					mo1.getFedMapping().execute(getTID(), frEmpty, fr1, fr2);
-					if ( mo1.isFederated(FType.PART) )
-						setPartialOutput(mo1.getFedMapping(), mo1, mo2, fr2.getID(), ec);
-					else
-						setOutputFedMapping(mo1.getFedMapping(), mo1, mo2, fr2.getID(), ec);
-				}
-				else {
-					FederatedRequest fr3 = new FederatedRequest(FederatedRequest.RequestType.GET_VAR, fr2.getID());
-					FederatedRequest fr4 = mo1.getFedMapping().cleanup(getTID(), fr2.getID());
-					//execute federated operations and aggregate
-					Future<FederatedResponse>[] tmp = mo1.getFedMapping().execute(getTID(), frEmpty, fr1, fr2, fr3, fr4);
-					MatrixBlock ret;
-					if ( mo1.isFederated(FType.PART) )
-						ret = FederationUtils.aggAdd(tmp);
-					else
-						ret = FederationUtils.bind(tmp, false);
-					ec.setMatrixOutput(output.getName(), ret);
-				}
+				new long[]{mo1.getFedMapping().getID(), fr1.getID()}, ExecType.SPARK, false);
+
+			boolean isVector = (mo2.getNumColumns() == 1);
+			boolean isPartOut = mo1.isFederated(FType.PART) || // MV and MM
+				(!isVector && mo2.isFederated(FType.PART)); // only MM
+			if(isPartOut && _fedOut.isForcedFederated()) {
+				mo1.getFedMapping().execute(getTID(), true, frEmpty, fr1, fr2);
+				setPartialOutput(mo1.getFedMapping(), mo1, mo2, fr2.getID(), ec);
+			}
+			else if((_fedOut.isForcedFederated() || (!isVector && !_fedOut.isForcedLocal()))
+				&& !isPartOut) { // not creating federated output in the MV case for reasons of performance
+				mo1.getFedMapping().execute(getTID(), true, frEmpty, fr1, fr2);
+				setOutputFedMapping(mo1.getFedMapping(), mo1, mo2, fr2.getID(), ec);
 			}
-			else { //MM
-				//execute federated operations and aggregate
-				if ( !_fedOut.isForcedLocal() ){
-					mo1.getFedMapping().execute(getTID(), true, frEmpty, fr1, fr2);
-					if ( mo1.isFederated(FType.PART) || mo2.isFederated(FType.PART) )
-						setPartialOutput(mo1.getFedMapping(), mo1, mo2, fr2.getID(), ec);
-					else
-						setOutputFedMapping(mo1.getFedMapping(), mo1, mo2, fr2.getID(), ec);
-				}
-				else {
-					FederatedRequest fr3 = new FederatedRequest(FederatedRequest.RequestType.GET_VAR, fr2.getID());
-					FederatedRequest fr4 = mo1.getFedMapping().cleanup(getTID(), fr2.getID());
-					//execute federated operations and aggregate
-					Future<FederatedResponse>[] tmp = mo1.getFedMapping().execute(getTID(), frEmpty, fr1, fr2, fr3, fr4);
-					MatrixBlock ret;
-					if ( mo1.isFederated(FType.PART) )
-						ret = FederationUtils.aggAdd(tmp);
-					else
-						ret = FederationUtils.bind(tmp, false);
-					ec.setMatrixOutput(output.getName(), ret);
-				}
+			else {
+				aggregateLocally(mo1.getFedMapping(), mo1.isFederated(FType.PART), ec, frEmpty, fr1, fr2);
 			}
 		}
 		//#2 vector - federated matrix multiplication
 		else if (mo2.isFederated(FType.ROW)) {// VM + MM
-			if ( mo1.isFederated(FType.COL) && isAggBinaryFedAligned(mo1,mo2) ){
-				FederatedRequest fr2 = FederationUtils.callInstruction(instString, output,
-					new CPOperand[]{input1, input2},
-					new long[]{mo1.getFedMapping().getID(), mo2.getFedMapping().getID()}, true);
-				if ( _fedOut.isForcedFederated() ){
-					// Partial aggregates (set fedmapping to the partial aggs)
-					mo2.getFedMapping().execute(getTID(), true, fr2);
-					setPartialOutput(mo2.getFedMapping(), mo1, mo2, fr2.getID(), ec);
-				}
-				else {
-					FederatedRequest fr3 = new FederatedRequest(FederatedRequest.RequestType.GET_VAR, fr2.getID());
-					//execute federated operations and aggregate
-					Future<FederatedResponse>[] tmp = mo2.getFedMapping().execute(getTID(), fr2, fr3);
-					MatrixBlock ret = FederationUtils.aggAdd(tmp);
-					ec.setMatrixOutput(output.getName(), ret);
-				}
+			//construct commands: broadcast rhs, fed mv, retrieve results
+			FederatedRequest[] fr1 = mo2.getFedMapping().broadcastSliced(mo1, true);
+			FederatedRequest fr2 = FederationUtils.callInstruction(instString, output, id,
+				new CPOperand[]{input1, input2},
+				new long[]{fr1[0].getID(), mo2.getFedMapping().getID()}, ExecType.SPARK, false);
+			if ( _fedOut.isForcedFederated() ){
+				// Partial aggregates (set fedmapping to the partial aggs)
+				mo2.getFedMapping().execute(getTID(), true, fr1, frEmpty, fr2);
+				setPartialOutput(mo2.getFedMapping(), mo1, mo2, fr2.getID(), ec);
 			}
 			else {
-				//construct commands: broadcast rhs, fed mv, retrieve results
-				FederatedRequest[] fr1 = mo2.getFedMapping().broadcastSliced(mo1, true);
-				FederatedRequest fr2 = FederationUtils.callInstruction(instString, output,
-					new CPOperand[]{input1, input2},
-					new long[]{fr1[0].getID(), mo2.getFedMapping().getID()}, true);
-				if ( _fedOut.isForcedFederated() ){
-					// Partial aggregates (set fedmapping to the partial aggs)
-					mo2.getFedMapping().execute(getTID(), true, fr1, fr2);
-					setPartialOutput(mo2.getFedMapping(), mo1, mo2, fr2.getID(), ec);
-				}
-				else {
-					FederatedRequest fr3 = new FederatedRequest(FederatedRequest.RequestType.GET_VAR, fr2.getID());
-					FederatedRequest fr4 = mo2.getFedMapping().cleanup(getTID(), fr2.getID());
-					//execute federated operations and aggregate
-					Future<FederatedResponse>[] tmp = mo2.getFedMapping().execute(getTID(), true, fr1, fr2, fr3, fr4);
-					MatrixBlock ret = FederationUtils.aggAdd(tmp);
-					ec.setMatrixOutput(output.getName(), ret);
-				}
+				aggregateLocally(mo2.getFedMapping(), true, ec, fr1, frEmpty, fr2);
 			}
 		}
 		//#3 col-federated matrix vector multiplication
 		else if (mo1.isFederated(FType.COL)) {// VM + MM
 			//construct commands: broadcast rhs, fed mv, retrieve results
 			FederatedRequest[] fr1 = mo1.getFedMapping().broadcastSliced(mo2, true);
-			FederatedRequest fr2 = FederationUtils.callInstruction(instString, output,
+			FederatedRequest fr2 = FederationUtils.callInstruction(instString, output, id,
 				new CPOperand[]{input1, input2},
-				new long[]{mo1.getFedMapping().getID(), fr1[0].getID()}, true);
+				new long[]{mo1.getFedMapping().getID(), fr1[0].getID()}, ExecType.SPARK, false);
 			if ( _fedOut.isForcedFederated() ){
 				// Partial aggregates (set fedmapping to the partial aggs)
-				mo1.getFedMapping().execute(getTID(), true, fr1, fr2);
+				mo1.getFedMapping().execute(getTID(), true, fr1, frEmpty, fr2);
 				setPartialOutput(mo1.getFedMapping(), mo1, mo2, fr2.getID(), ec);
 			}
 			else {
-				FederatedRequest fr3 = new FederatedRequest(FederatedRequest.RequestType.GET_VAR, fr2.getID());
-				FederatedRequest fr4 = mo1.getFedMapping().cleanup(getTID(), fr2.getID());
-				//execute federated operations and aggregate
-				Future<FederatedResponse>[] tmp = mo1.getFedMapping().execute(getTID(), fr1, fr2, fr3, fr4);
-				MatrixBlock ret = FederationUtils.aggAdd(tmp);
-				ec.setMatrixOutput(output.getName(), ret);
+				aggregateLocally(mo1.getFedMapping(), true, ec, fr1, frEmpty, fr2);
 			}
 		}
 		else { //other combinations
@@ -216,28 +156,6 @@ public class MMFEDInstruction extends BinaryFEDInstruction
 	}
 
 	/**
-	 * Checks alignment of dimensions for the federated aggregate binary processing without broadcast.
-	 * If the begin and end ranges of mo1 has cols equal to the rows of the begin and end ranges of mo2,
-	 * the two inputs are aligned for the processing of the federated aggregate binary instruction without broadcasting.
-	 * @param mo1 input matrix object 1
-	 * @param mo2 input matrix object 2
-	 * @return true if the two inputs are aligned for aggregate binary processing without broadcasting
-	 */
-	private static boolean isAggBinaryFedAligned(MatrixObject mo1, MatrixObject mo2){
-		FederatedRange[] mo1FederatedRanges = mo1.getFedMapping().getFederatedRanges();
-		FederatedRange[] mo2FederatedRanges = mo2.getFedMapping().getFederatedRanges();
-		for ( int i = 0; i < mo1FederatedRanges.length; i++ ){
-			FederatedRange mo1FedRange = mo1FederatedRanges[i];
-			FederatedRange mo2FedRange = mo2FederatedRanges[i];
-
-			if ( mo1FedRange.getBeginDims()[1] != mo2FedRange.getBeginDims()[0]
-				|| mo1FedRange.getEndDims()[1] != mo2FedRange.getEndDims()[0])
-				return false;
-		}
-		return true;
-	}
-
-	/**
 	 * Sets the output with a federated mapping of overlapping partial aggregates.
 	 * @param federationMap federated map from which the federated metadata is retrieved
 	 * @param mo1 matrix object with number of rows used to set the number of rows of the output
@@ -268,4 +186,39 @@ public class MMFEDInstruction extends BinaryFEDInstruction
 		out.getDataCharacteristics().set(mo1.getNumRows(), mo2.getNumColumns(), (int)mo1.getBlocksize());
 		out.setFedMapping(federationMap.copyWithNewID(outputID, mo2.getNumColumns()));
 	}
+
+	private void aggregateLocally(FederationMap fedMap, boolean aggAdd, ExecutionContext ec,
+		FederatedRequest... fr) {
+		aggregateLocally(fedMap, aggAdd, ec, null, fr);
+	}
+
+	/**
+	 * Get the partial results and aggregate the partial results locally
+	 * @param fedMap the federated mapping
+	 * @param aggAdd indicates whether to aggregate the results by addition or binding
+	 * @param ec execution context
+	 * @param frSliced the federated request array from a sliced broadcast
+	 * @param fr the previous federated requests
+	 * NOTE: the last federated request fr has to be the instruction call
+	 */
+	private void aggregateLocally(FederationMap fedMap, boolean aggAdd,
+		ExecutionContext ec, FederatedRequest[] frSliced, FederatedRequest... fr) {
+		long callInstID = fr[fr.length - 1].getID();
+		FederatedRequest frG = new FederatedRequest(RequestType.GET_VAR, callInstID);
+		FederatedRequest frC = fedMap.cleanup(getTID(), callInstID);
+
+		//execute federated operations and aggregate
+		Future<FederatedResponse>[] ffr;
+		if(frSliced != null)
+			ffr = fedMap.execute(getTID(), frSliced, ArrayUtils.addAll(fr, frG, frC));
+		else
+			ffr = fedMap.execute(getTID(), ArrayUtils.addAll(fr, frG, frC));
+
+		MatrixBlock ret;
+		if ( aggAdd )
+			ret = FederationUtils.aggAdd(ffr);
+		else
+			ret = FederationUtils.bind(ffr, false);
+		ec.setMatrixOutput(output.getName(), ret);
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/QuaternaryWDivMMFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/QuaternaryWDivMMFEDInstruction.java
index 414a4ff..c3fbb08 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/QuaternaryWDivMMFEDInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/QuaternaryWDivMMFEDInstruction.java
@@ -41,7 +41,6 @@ import org.apache.sysds.runtime.matrix.operators.QuaternaryOperator;
 
 import java.util.ArrayList;
 import java.util.concurrent.Future;
-import java.util.stream.IntStream;
 
 public class QuaternaryWDivMMFEDInstruction extends QuaternaryFEDInstruction
 {
@@ -216,32 +215,15 @@ public class QuaternaryWDivMMFEDInstruction extends QuaternaryFEDInstruction
 			// LEFT: nrows of transposed X, ncols of U
 			rows = X.getNumColumns();
 			cols = U.getNumColumns();
-			outFedMap = modifyFedRanges(outFedMap.transpose(), cols, 1);
+			outFedMap.transpose().modifyFedRanges(cols, 1);
 		}
 		else if(wdivmm_type.isRight()) {
 			// RIGHT: nrows of X, ncols of V
 			rows = X.getNumRows();
 			cols = V.getNumColumns();
-			outFedMap = modifyFedRanges(outFedMap, cols, 1);
+			outFedMap.modifyFedRanges(cols, 1);
 		}
 		out.setFedMapping(outFedMap);
 		out.getDataCharacteristics().set(rows, cols, (int) X.getBlocksize());
 	}
-
-	/**
-	 * Takes the federated mapping and sets one dimension of all federated ranges
-	 * to the specified value.
-	 *
-	 * @param fedMap     the original federated mapping
-	 * @param value      long value for setting the dimension
-	 * @param dim        indicates if the row (0) or column (1) dimension should be set to value
-	 * @return FederationMap with the modified federated ranges
-	 */
-	private static FederationMap modifyFedRanges(FederationMap fedMap, long value, int dim) {
-		IntStream.range(0, fedMap.getFederatedRanges().length).forEach(i -> {
-			fedMap.getFederatedRanges()[i].setBeginDim(dim, 0);
-			fedMap.getFederatedRanges()[i].setEndDim(dim, value);
-		});
-		return fedMap;
-	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/SpoofFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/SpoofFEDInstruction.java
index 331ecfc..cd05ad5 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/SpoofFEDInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/SpoofFEDInstruction.java
@@ -50,7 +50,6 @@ import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.concurrent.Future;
-import java.util.stream.IntStream;
 
 public class SpoofFEDInstruction extends FEDInstruction
 {
@@ -269,10 +268,7 @@ public class SpoofFEDInstruction extends FEDInstruction
 			if(_cellType == CellType.ROW_AGG || _cellType == CellType.COL_AGG) {
 				int dim = (_cellType == CellType.COL_AGG ? 0 : 1);
 				// crop federation map to a vector
-				IntStream.range(0, fedMap.getFederatedRanges().length).forEach(i -> {
-					fedMap.getFederatedRanges()[i].setBeginDim(dim, 0);
-					fedMap.getFederatedRanges()[i].setEndDim(dim, 1);
-				});
+				fedMap.modifyFedRanges(1, dim);
 			}
 			return fedMap;
 		}
@@ -333,18 +329,10 @@ public class SpoofFEDInstruction extends FEDInstruction
 		protected void setFedOutput(ExecutionContext ec, FederationMap fedMap, long frComputeID) {
 			// derive output federated mapping
 			MatrixObject out = ec.getMatrixObject(_output);
-			FederationMap newFedMap = modifyFedRanges(fedMap.copyWithNewID(frComputeID), out.getNumColumns());
+			FederationMap newFedMap = fedMap.copyWithNewID(frComputeID).modifyFedRanges(out.getNumColumns(), 1);
 			out.setFedMapping(newFedMap);
 		}
 
-		private static FederationMap modifyFedRanges(FederationMap fedMap, long cols) {
-			IntStream.range(0, fedMap.getFederatedRanges().length).forEach(i -> {
-				fedMap.getFederatedRanges()[i].setBeginDim(1, 0);
-				fedMap.getFederatedRanges()[i].setEndDim(1, cols);
-			});
-			return fedMap;
-		}
-
 		protected void aggResult(ExecutionContext ec, Future<FederatedResponse>[] response,
 			FederationMap fedMap) {
 			if(_fedType != FType.ROW)
@@ -466,18 +454,10 @@ public class SpoofFEDInstruction extends FEDInstruction
 			// derive output federated mapping
 			MatrixObject out = ec.getMatrixObject(_output);
 			int dim = (newFedMap.getType() == FType.ROW ? 1 : 0);
-			newFedMap = modifyFedRanges(newFedMap, dim, outDims[dim]);
+			newFedMap.modifyFedRanges(outDims[dim], dim);
 			out.setFedMapping(newFedMap);
 		}
 
-		private static FederationMap modifyFedRanges(FederationMap fedMap, int dim, long value) {
-			IntStream.range(0, fedMap.getFederatedRanges().length).forEach(i -> {
-				fedMap.getFederatedRanges()[i].setBeginDim(dim, 0);
-				fedMap.getFederatedRanges()[i].setEndDim(dim, value);
-			});
-			return fedMap;
-		}
-
 		protected void aggResult(ExecutionContext ec, Future<FederatedResponse>[] response,
 			FederationMap fedMap) {
 			AggregateUnaryOperator aop = InstructionUtils.parseBasicAggregateUnaryOperator("uak+");
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRowAggregateTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedMisAlignedTest.java
similarity index 54%
copy from src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRowAggregateTest.java
copy to src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedMisAlignedTest.java
index ab69143..ecc8a7b 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRowAggregateTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedMisAlignedTest.java
@@ -30,23 +30,18 @@ import org.apache.sysds.test.AutomatedTestBase;
 import org.apache.sysds.test.TestConfiguration;
 import org.apache.sysds.test.TestUtils;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 @RunWith(value = Parameterized.class)
 @net.jcip.annotations.NotThreadSafe
-public class FederatedRowAggregateTest extends AutomatedTestBase {
-	private final static String TEST_NAME5 = "FederatedRowSumTest";
-	private final static String TEST_NAME6 = "FederatedRowMeanTest";
-	private final static String TEST_NAME7 = "FederatedRowMaxTest";
-	private final static String TEST_NAME8 = "FederatedRowMinTest";
-	private final static String TEST_NAME9 = "FederatedRowVarTest";
-	private final static String TEST_NAME10 = "FederatedRowProdTest";
-	private final static String TEST_NAME11 = "FederatedMMTest";
-
-	private final static String TEST_DIR = "functions/federated/aggregate/";
-	private static final String TEST_CLASS_DIR = TEST_DIR + FederatedRowAggregateTest.class.getSimpleName() + "/";
+public class FederatedMisAlignedTest extends AutomatedTestBase {
+	private final static String TEST_NAME1 = "FederatedMisAlignedTest";
+
+	private final static String TEST_DIR = "functions/federated/";
+	private static final String TEST_CLASS_DIR = TEST_DIR + FederatedMisAlignedTest.class.getSimpleName() + "/";
 
 	private final static int blocksize = 1024;
 	@Parameterized.Parameter()
@@ -66,122 +61,138 @@ public class FederatedRowAggregateTest extends AutomatedTestBase {
 	}
 
 	private enum OpType {
-		SUM, MEAN, MAX, MIN, VAR, PROD, MM
+		MM,
+		EW_MULT,
+		EW_PLUS,
+		EW_GREATER,
+		BIND,
+	}
+
+	private enum MisAlignmentType {
+		HOST,
+		RANGE,
 	}
 
 	@Override
 	public void setUp() {
 		TestUtils.clearAssertionInformation();
-		addTestConfiguration(TEST_NAME5, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME5, new String[] {"S"}));
-		addTestConfiguration(TEST_NAME6, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME6, new String[] {"S"}));
-		addTestConfiguration(TEST_NAME7, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME7, new String[] {"S"}));
-		addTestConfiguration(TEST_NAME8, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME8, new String[] {"S"}));
-		addTestConfiguration(TEST_NAME9, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME9, new String[] {"S"}));
-		addTestConfiguration(TEST_NAME10, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME10, new String[] {"S"}));
-		addTestConfiguration(TEST_NAME11, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME11, new String[] {"S"}));
+		addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {"S"}));
+	}
+
+	@Test
+	public void testMMMisAlignedHostCP() {
+		runMisAlignedTest(OpType.MM, ExecMode.SINGLE_NODE, MisAlignmentType.HOST);
+	}
+
+	@Test
+	public void testMMMisAlignedHostSP() {
+		runMisAlignedTest(OpType.MM, ExecMode.SPARK, MisAlignmentType.HOST);
+	}
+
+	@Test
+	public void testMMMisAlignedRangeCP() {
+		runMisAlignedTest(OpType.MM, ExecMode.SINGLE_NODE, MisAlignmentType.RANGE);
+	}
+
+	@Test
+	public void testMMMisAlignedRangeSP() {
+		runMisAlignedTest(OpType.MM, ExecMode.SPARK, MisAlignmentType.RANGE);
+	}
+
+	@Test
+	public void testEWMultMisAlignedHostCP() {
+		runMisAlignedTest(OpType.EW_MULT, ExecMode.SINGLE_NODE, MisAlignmentType.HOST);
+	}
+
+	@Test
+	@Ignore
+	public void testEWMultMisAlignedHostSP() {
+		runMisAlignedTest(OpType.EW_MULT, ExecMode.SPARK, MisAlignmentType.HOST);
 	}
 
 	@Test
-	public void testRowSumDenseMatrixCP() {
-		runAggregateOperationTest(OpType.SUM, ExecMode.SINGLE_NODE);
+	@Ignore
+	public void testEWMultMisAlignedRangeCP() {
+		runMisAlignedTest(OpType.EW_MULT, ExecMode.SINGLE_NODE, MisAlignmentType.RANGE);
 	}
 
 	@Test
-	public void testRowMeanDenseMatrixCP() {
-		runAggregateOperationTest(OpType.MEAN, ExecMode.SINGLE_NODE);
+	public void testEWMultMisAlignedRangeSP() {
+		runMisAlignedTest(OpType.EW_MULT, ExecMode.SPARK, MisAlignmentType.RANGE);
 	}
 
 	@Test
-	public void testRowMaxDenseMatrixCP() {
-		runAggregateOperationTest(OpType.MAX, ExecMode.SINGLE_NODE);
+	@Ignore
+	public void testEWPlusMisAlignedHostCP() {
+		runMisAlignedTest(OpType.EW_PLUS, ExecMode.SINGLE_NODE, MisAlignmentType.HOST);
 	}
 
 	@Test
-	public void testRowMinDenseMatrixCP() {
-		runAggregateOperationTest(OpType.MIN, ExecMode.SINGLE_NODE);
+	public void testEWPlusMisAlignedHostSP() {
+		runMisAlignedTest(OpType.EW_PLUS, ExecMode.SPARK, MisAlignmentType.HOST);
 	}
 
 	@Test
-	public void testRowVarDenseMatrixCP() {
-		runAggregateOperationTest(OpType.VAR, ExecMode.SINGLE_NODE);
+	public void testEWPlusMisAlignedRangeCP() {
+		runMisAlignedTest(OpType.EW_PLUS, ExecMode.SINGLE_NODE, MisAlignmentType.RANGE);
 	}
 
 	@Test
-	public void testRowProdDenseMatrixCP() {
-		runAggregateOperationTest(OpType.PROD, ExecMode.SINGLE_NODE);
+	@Ignore
+	public void testEWPlusMisAlignedRangeSP() {
+		runMisAlignedTest(OpType.EW_PLUS, ExecMode.SPARK, MisAlignmentType.RANGE);
 	}
 
 	@Test
-	public void testMMDenseMatrixCP() {
-		runAggregateOperationTest(OpType.MM, ExecMode.SINGLE_NODE);
+	public void testEWGreaterMisAlignedHostCP() {
+		runMisAlignedTest(OpType.EW_GREATER, ExecMode.SINGLE_NODE, MisAlignmentType.HOST);
 	}
 
 	@Test
-	public void testRowSumDenseMatrixSP() {
-		runAggregateOperationTest(OpType.SUM, ExecMode.SPARK);
+	@Ignore
+	public void testEWGreaterMisAlignedHostSP() {
+		runMisAlignedTest(OpType.EW_GREATER, ExecMode.SPARK, MisAlignmentType.HOST);
 	}
 
 	@Test
-	public void testRowMeanDenseMatrixSP() {
-		runAggregateOperationTest(OpType.MEAN, ExecMode.SPARK);
+	@Ignore
+	public void testEWGreaterMisAlignedRangeCP() {
+		runMisAlignedTest(OpType.EW_GREATER, ExecMode.SINGLE_NODE, MisAlignmentType.RANGE);
 	}
 
 	@Test
-	public void testRowMaxDenseMatrixSP() {
-		runAggregateOperationTest(OpType.MAX, ExecMode.SPARK);
+	public void testEWGreaterMisAlignedRangeSP() {
+		runMisAlignedTest(OpType.EW_GREATER, ExecMode.SPARK, MisAlignmentType.RANGE);
 	}
 
 	@Test
-	public void testRowMinDenseMatrixSP() {
-		runAggregateOperationTest(OpType.MIN, ExecMode.SPARK);
+	public void testBindMisAlignedHostCP() {
+		runMisAlignedTest(OpType.BIND, ExecMode.SINGLE_NODE, MisAlignmentType.HOST);
 	}
 
 	@Test
-	public void testRowVarDenseMatrixSP() {
-		runAggregateOperationTest(OpType.VAR, ExecMode.SPARK);
+	public void testBindMisAlignedHostSP() {
+		runMisAlignedTest(OpType.BIND, ExecMode.SPARK, MisAlignmentType.HOST);
 	}
 
 	@Test
-	public void testRowProdDenseMatrixSP() {
-		runAggregateOperationTest(OpType.PROD, ExecMode.SPARK);
+	public void testBindMisAlignedRangeCP() {
+		runMisAlignedTest(OpType.BIND, ExecMode.SINGLE_NODE, MisAlignmentType.RANGE);
 	}
 
 	@Test
-	public void testMMDenseMatrixSP() {
-		runAggregateOperationTest(OpType.MM, ExecMode.SPARK);
+	public void testBindMisAlignedRangeSP() {
+		runMisAlignedTest(OpType.BIND, ExecMode.SPARK, MisAlignmentType.RANGE);
 	}
 
-	private void runAggregateOperationTest(OpType type, ExecMode execMode) {
+	private void runMisAlignedTest(OpType type, ExecMode execMode, MisAlignmentType maType) {
 		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
 		ExecMode platformOld = rtplatform;
 
 		if(rtplatform == ExecMode.SPARK)
 			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
 
-		String TEST_NAME = null;
-		switch(type) {
-			case SUM:
-				TEST_NAME = TEST_NAME5;
-				break;
-			case MEAN:
-				TEST_NAME = TEST_NAME6;
-				break;
-			case MAX:
-				TEST_NAME = TEST_NAME7;
-				break;
-			case MIN:
-				TEST_NAME = TEST_NAME8;
-				break;
-			case VAR:
-				TEST_NAME = TEST_NAME9;
-				break;
-			case PROD:
-				TEST_NAME = TEST_NAME10;
-				break;
-			case MM:
-				TEST_NAME = TEST_NAME11;
-				break;
-		}
+		String TEST_NAME = TEST_NAME1;
 
 		getAndLoadTestConfiguration(TEST_NAME);
 		String HOME = SCRIPT_DIR + TEST_DIR;
@@ -205,6 +216,13 @@ public class FederatedRowAggregateTest extends AutomatedTestBase {
 		writeInputMatrixWithMTD("X3", X3, false, mc);
 		writeInputMatrixWithMTD("X4", X4, false, mc);
 
+		rtplatform = execMode;
+		if(rtplatform == ExecMode.SPARK) {
+			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+		}
+		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
+		loadTestConfiguration(config);
+
 		// empty script name because we don't execute any script, just start the worker
 		fullDMLScriptName = "";
 		int port1 = getRandomAvailablePort();
@@ -216,58 +234,45 @@ public class FederatedRowAggregateTest extends AutomatedTestBase {
 		Thread t3 = startLocalFedWorkerThread(port3, FED_WORKER_WAIT_S);
 		Thread t4 = startLocalFedWorkerThread(port4);
 
-		rtplatform = execMode;
-		if(rtplatform == ExecMode.SPARK) {
-			System.out.println(7);
-			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
-		}
-		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
-		loadTestConfiguration(config);
-
 		// Run reference dml script with normal matrix
 		fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
-		programArgs = new String[] {"-stats", "100", "-args", input("X1"), input("X2"), input("X3"), input("X4"),
-			expected("S"), Boolean.toString(rowPartitioned).toUpperCase()};
+		programArgs = new String[] {"-stats", "100", "-nvargs",
+			"in_X1=" + input("X1"), "in_X2=" + input("X2"), "in_X3=" + input("X3"), "in_X4=" + input("X4"),
+			"testnum=" + Integer.toString(type.ordinal()), "misaligntype=" + Integer.toString(maType.ordinal()),
+			"rP=" + Boolean.toString(rowPartitioned).toUpperCase(), "out_S=" + expected("S")};
 		runTest(true, false, null, -1);
-
+		
 		// Run actual dml script with federated matrix
-
+		
 		fullDMLScriptName = HOME + TEST_NAME + ".dml";
 		programArgs = new String[] {"-stats", "100", "-nvargs",
 			"in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
 			"in_X2=" + TestUtils.federatedAddress(port2, input("X2")),
 			"in_X3=" + TestUtils.federatedAddress(port3, input("X3")),
 			"in_X4=" + TestUtils.federatedAddress(port4, input("X4")), "rows=" + rows, "cols=" + cols,
+			"testnum=" + Integer.toString(type.ordinal()), "misaligntype=" + Integer.toString(maType.ordinal()),
 			"rP=" + Boolean.toString(rowPartitioned).toUpperCase(), "out_S=" + output("S")};
 
 		runTest(true, false, null, -1);
 
 		// compare via files
-		compareResults(type == FederatedRowAggregateTest.OpType.VAR ? 1e-2 : 1e-9, "Stat-DML1", "Stat-DML2");
-
-		String fedInst = "fed_uar";
+		compareResults(1e-9, "Stat-DML1", "Stat-DML2");
 
 		switch(type) {
-			case SUM:
-				Assert.assertTrue(heavyHittersContainsString(fedInst.concat("k+")));
-				break;
-			case MEAN:
-				Assert.assertTrue(heavyHittersContainsString(fedInst.concat("mean")));
+			case MM:
+				Assert.assertTrue(heavyHittersContainsString(rtplatform == ExecMode.SPARK ? "fed_mapmm" : "fed_ba+*"));
 				break;
-			case MAX:
-				Assert.assertTrue(heavyHittersContainsString(fedInst.concat("max")));
+			case EW_MULT:
+				Assert.assertTrue(heavyHittersContainsString("fed_*"));
 				break;
-			case MIN:
-				Assert.assertTrue(heavyHittersContainsString(fedInst.concat("min")));
+			case EW_PLUS:
+				Assert.assertTrue(heavyHittersContainsString("fed_+"));
 				break;
-			case VAR:
-				Assert.assertTrue(heavyHittersContainsString(fedInst.concat("var")));
+			case EW_GREATER:
+				Assert.assertTrue(heavyHittersContainsString("fed_>"));
 				break;
-			case PROD:
-				Assert.assertTrue(heavyHittersContainsString(fedInst.concat("*")));
-				break;
-			case MM:
-				Assert.assertTrue(heavyHittersContainsString(rtplatform == ExecMode.SPARK ? "fed_mapmm" : "fed_ba+*"));
+			case BIND:
+				Assert.assertTrue(heavyHittersContainsString(rtplatform == ExecMode.SPARK ? "fed_mappend" : "fed_append"));
 				break;
 		}
 
@@ -281,6 +286,5 @@ public class FederatedRowAggregateTest extends AutomatedTestBase {
 
 		rtplatform = platformOld;
 		DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
-
 	}
 }
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRCBindTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRCBindTest.java
index a3f37f7..fb07a58 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRCBindTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRCBindTest.java
@@ -28,9 +28,11 @@ import org.apache.sysds.runtime.meta.MatrixCharacteristics;
 import org.apache.sysds.test.AutomatedTestBase;
 import org.apache.sysds.test.TestConfiguration;
 import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+
 @RunWith(value = Parameterized.class)
 @net.jcip.annotations.NotThreadSafe
 public class FederatedRCBindTest extends AutomatedTestBase {
@@ -70,7 +72,8 @@ public class FederatedRCBindTest extends AutomatedTestBase {
 		// F-F, F-L, L-F
 		addTestConfiguration(TEST_NAME,
 			new TestConfiguration(TEST_CLASS_DIR, TEST_NAME,
-				new String[] {"R_FF", "R_FL", "R_LF", "C_FF", "C_FL", "C_LF"}));
+				new String[] {"R_FF_misaligned", "C_FF_aligned",
+					"R_FF", "R_FL", "R_LF", "C_FF", "C_FL", "C_LF"}));
 	}
 
 	@Test
@@ -121,7 +124,8 @@ public class FederatedRCBindTest extends AutomatedTestBase {
 		programArgs = new String[] {"-nvargs", "in_A1=" + input("A1"), "in_A2=" + input("A2"),
 			"in_B1=" + input("B1"), "in_B2=" + input("B2"),
 			"in_partitioned=" + Boolean.toString(partitioned).toUpperCase(),
-			"out_R_FF=" + expected("R_FF"),
+			"out_R_FF_misaligned=" + expected("R_FF_misaligned"),
+			"out_C_FF_aligned=" + expected("C_FF_aligned"), "out_R_FF=" + expected("R_FF"),
 			"out_R_FL=" + expected("R_FL"), "out_R_LF=" + expected("R_LF"), "out_C_FF=" + expected("C_FF"),
 			"out_C_FL=" + expected("C_FL"), "out_C_LF=" + expected("C_LF")};
 		runTest(true, false, null, -1);
@@ -134,13 +138,14 @@ public class FederatedRCBindTest extends AutomatedTestBase {
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
 		loadTestConfiguration(config);
 		fullDMLScriptName = HOME + TEST_NAME + ".dml";
-		programArgs = new String[] {"-nvargs",
+		programArgs = new String[] {"-stats", "-nvargs",
 			"in_A1=" + TestUtils.federatedAddress(port1, input("A1")),
 			"in_A2=" + TestUtils.federatedAddress(port2, input("A2")),
 			"in_B1=" + TestUtils.federatedAddress(port3, input("B1")),
 			"in_B2=" + TestUtils.federatedAddress(port4, input("B2")),
 			"in_partitioned=" + Boolean.toString(partitioned).toUpperCase(),
 			"in_B1_local=" + input("B1"), "in_B2_local=" + input("B2"), "rows=" + rows, "cols=" + cols,
+			"out_R_FF_misaligned=" + output("R_FF_misaligned"), "out_C_FF_aligned=" + output("C_FF_aligned"),
 			"out_R_FF=" + output("R_FF"), "out_R_FL=" + output("R_FL"), "out_R_LF=" + output("R_LF"),
 			"out_C_FF=" + output("C_FF"), "out_C_FL=" + output("C_FL"), "out_C_LF=" + output("C_LF")};
 
@@ -149,6 +154,8 @@ public class FederatedRCBindTest extends AutomatedTestBase {
 		// compare all sums via files
 		compareResults(1e-11);
 
+		Assert.assertTrue(heavyHittersContainsString(rtplatform == ExecMode.SPARK ? "fed_mappend" : "fed_append", 1, 8));
+
 		TestUtils.shutdownThreads(t1, t2, t3, t4);
 		rtplatform = platformOld;
 		DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRowAggregateTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRowAggregateTest.java
index ab69143..2db9e69 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRowAggregateTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRowAggregateTest.java
@@ -205,6 +205,13 @@ public class FederatedRowAggregateTest extends AutomatedTestBase {
 		writeInputMatrixWithMTD("X3", X3, false, mc);
 		writeInputMatrixWithMTD("X4", X4, false, mc);
 
+		rtplatform = execMode;
+		if(rtplatform == ExecMode.SPARK) {
+			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+		}
+		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
+		loadTestConfiguration(config);
+
 		// empty script name because we don't execute any script, just start the worker
 		fullDMLScriptName = "";
 		int port1 = getRandomAvailablePort();
@@ -216,14 +223,6 @@ public class FederatedRowAggregateTest extends AutomatedTestBase {
 		Thread t3 = startLocalFedWorkerThread(port3, FED_WORKER_WAIT_S);
 		Thread t4 = startLocalFedWorkerThread(port4);
 
-		rtplatform = execMode;
-		if(rtplatform == ExecMode.SPARK) {
-			System.out.println(7);
-			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
-		}
-		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
-		loadTestConfiguration(config);
-
 		// Run reference dml script with normal matrix
 		fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
 		programArgs = new String[] {"-stats", "100", "-args", input("X1"), input("X2"), input("X3"), input("X4"),
@@ -267,7 +266,7 @@ public class FederatedRowAggregateTest extends AutomatedTestBase {
 				Assert.assertTrue(heavyHittersContainsString(fedInst.concat("*")));
 				break;
 			case MM:
-				Assert.assertTrue(heavyHittersContainsString(rtplatform == ExecMode.SPARK ? "fed_mapmm" : "fed_ba+*"));
+				Assert.assertTrue(heavyHittersContainsString(rtplatform == ExecMode.SPARK ? "fed_mapmm" : "fed_ba+*", 1, 2));
 				break;
 		}
 
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedTokenizeTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedTokenizeTest.java
index 02dd496..36a5cc5 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedTokenizeTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedTokenizeTest.java
@@ -92,11 +92,9 @@ public class FederatedTokenizeTest extends AutomatedTestBase {
 		int port1 = getRandomAvailablePort();
 		int port2 = getRandomAvailablePort();
 		int port3 = getRandomAvailablePort();
-		int port4 = getRandomAvailablePort();
 		Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S);
 		Thread t2 = startLocalFedWorkerThread(port2, FED_WORKER_WAIT_S);
-		Thread t3 = startLocalFedWorkerThread(port3, FED_WORKER_WAIT_S);
-		Thread t4 = startLocalFedWorkerThread(port4);
+		Thread t3 = startLocalFedWorkerThread(port3);
 
 		FileFormatPropertiesCSV ffpCSV = new FileFormatPropertiesCSV(false, DataExpression.DEFAULT_DELIM_DELIMITER, false);
 
@@ -139,7 +137,7 @@ public class FederatedTokenizeTest extends AutomatedTestBase {
 		runTest(null);
 		compareResults(1e-9);
 		Assert.assertTrue(heavyHittersContainsString("fed_tokenize"));
-		TestUtils.shutdownThreads(t1, t2, t3, t4);
+		TestUtils.shutdownThreads(t1, t2, t3);
 	}
 
 	private void writeDatasetSlice(FrameBlock dataset, FrameWriter fw, FileFormatPropertiesCSV ffpCSV, String name) throws IOException {
diff --git a/src/test/scripts/functions/federated/FederatedMisAlignedTest.dml b/src/test/scripts/functions/federated/FederatedMisAlignedTest.dml
new file mode 100644
index 0000000..13c2403
--- /dev/null
+++ b/src/test/scripts/functions/federated/FederatedMisAlignedTest.dml
@@ -0,0 +1,88 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+testnum = $testnum;
+misaligntype = $misaligntype;
+
+if(misaligntype == 0) { # misaligned hosts
+  if ($rP) {
+    X = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+      ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+    	 list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+    Y = federated(addresses=list($in_X1, $in_X2, $in_X4, $in_X3), # switched partition 3 and 4
+      ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+  		  list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+  } else {
+    X = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+      ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2),
+      	list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
+    Y = federated(addresses=list($in_X1, $in_X2, $in_X4, $in_X3), # switched partition 3 and 4
+      ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2),
+      	list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
+  }
+}
+else if(misaligntype == 1) { # misaligned ranges
+  if ($rP) {
+    X = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+        ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+    		list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+    Y = X[2:nrow(X), ];
+    X = X[1:(nrow(X)-1), ];
+  } else {
+    X = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+            ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2),
+            	list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
+    Y = X[ , 2:ncol(X)];
+    X = X[ , 1:(ncol(X)-1)];
+  }
+}
+
+
+while(FALSE) { }
+
+if(testnum == 0) { # MM
+  if($rP) {
+    X = t(X); # col partitioned federated X
+  }
+  else {
+    Y = t(Y); # row partitioned federated Y
+  }
+  while(FALSE) { }
+
+  S = X %*% Y;
+}
+else if(testnum == 1) { # EW_MULT
+  S = X * Y;
+}
+else if(testnum == 2) { # EW_PLUS
+  S = X + Y;
+}
+else if(testnum == 3) { # EW_GREATER
+  S = X > Y;
+}
+else if(testnum == 4) { # BIND
+  if($rP)
+    S = cbind(X, Y);
+  else
+    S = rbind(X, Y);
+}
+
+write(S, $out_S);
diff --git a/src/test/scripts/functions/federated/FederatedMisAlignedTestReference.dml b/src/test/scripts/functions/federated/FederatedMisAlignedTestReference.dml
new file mode 100644
index 0000000..f4ef17d
--- /dev/null
+++ b/src/test/scripts/functions/federated/FederatedMisAlignedTestReference.dml
@@ -0,0 +1,74 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+testnum = $testnum;
+misaligntype = $misaligntype;
+
+if(misaligntype == 0) { # misaligned hosts
+  if($rP) {
+    X = rbind(read($in_X1), read($in_X2), read($in_X3), read($in_X4));
+    Y = rbind(read($in_X1), read($in_X2), read($in_X4), read($in_X3)); # switched partition 3 and 4
+  }
+  else {
+    X = cbind(read($in_X1), read($in_X2), read($in_X3), read($in_X4));
+    Y = cbind(read($in_X1), read($in_X2), read($in_X4), read($in_X3)); # switched partition 3 and 4
+  }
+}
+else if(misaligntype == 1) { # misaligned ranges
+  if($rP) {
+    X = rbind(read($in_X1), read($in_X2), read($in_X3), read($in_X4));
+    Y = X[2:nrow(X), ];
+    X = X[1:(nrow(X)-1), ];
+  }
+  else {
+    X = cbind(read($in_X1), read($in_X2), read($in_X3), read($in_X4));
+    Y = X[ , 2:ncol(X)];
+    X = X[ , 1:(ncol(X)-1)];
+  }
+}
+
+if(testnum == 0) { # MM
+  if($rP) {
+    X = t(X); # col partitioned federated X
+  }
+  else {
+    Y = t(Y); # row partitioned federated Y
+  }
+
+  S = X %*% Y;
+}
+else if(testnum == 1) { # EW_MULT
+  S = X * Y;
+}
+else if(testnum == 2) { # EW_PLUS
+  S = X + Y;
+}
+else if(testnum == 3) { # EW_GREATER
+  S = X > Y;
+}
+else if(testnum == 4) { # BIND
+  if($rP)
+    S = cbind(X, Y);
+  else
+    S = rbind(X, Y);
+}
+
+write(S, $out_S);
diff --git a/src/test/scripts/functions/federated/FederatedRCBindTest.dml b/src/test/scripts/functions/federated/FederatedRCBindTest.dml
index 422b2cb..936c4bc 100644
--- a/src/test/scripts/functions/federated/FederatedRCBindTest.dml
+++ b/src/test/scripts/functions/federated/FederatedRCBindTest.dml
@@ -31,7 +31,8 @@ else {
   B = read($in_B1_local);
 }
 
-
+R_FF_misaligned = rbind(AF, AF);
+C_FF_aligned = cbind(AF, AF);
 R_FF = rbind(AF, BF)
 C_FF = cbind(AF, BF)
 R_FL = rbind(AF, B)
@@ -39,6 +40,9 @@ C_FL = cbind(AF, B)
 R_LF = rbind(B, AF)
 C_LF = cbind(B, AF)
 
+write(R_FF_misaligned, $out_R_FF_misaligned);
+write(C_FF_aligned, $out_C_FF_aligned);
+
 write(R_FF, $out_R_FF)
 write(R_FL, $out_R_FL)
 write(R_LF, $out_R_LF)
diff --git a/src/test/scripts/functions/federated/FederatedRCBindTestReference.dml b/src/test/scripts/functions/federated/FederatedRCBindTestReference.dml
index 30712e0..6bcc1e2 100644
--- a/src/test/scripts/functions/federated/FederatedRCBindTestReference.dml
+++ b/src/test/scripts/functions/federated/FederatedRCBindTestReference.dml
@@ -32,9 +32,14 @@ R = rbind(A, B)
 C = cbind(A, B)
 R_LF = rbind(B, A)
 C_LF = cbind(B, A)
+R_FF_misaligned = rbind(A, A);
+C_FF_aligned = cbind(A, A);
+
 write(R, $out_R_FF)
 write(R, $out_R_FL)
 write(R_LF, $out_R_LF)
 write(C, $out_C_FF)
 write(C, $out_C_FL)
 write(C_LF, $out_C_LF)
+write(R_FF_misaligned, $out_R_FF_misaligned);
+write(C_FF_aligned, $out_C_FF_aligned);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedMMTest.dml b/src/test/scripts/functions/federated/aggregate/FederatedMMTest.dml
index 9ba4176..2ef2bac 100644
--- a/src/test/scripts/functions/federated/aggregate/FederatedMMTest.dml
+++ b/src/test/scripts/functions/federated/aggregate/FederatedMMTest.dml
@@ -32,7 +32,16 @@ if ($rP) {
     Y = t(X) * 7; # row partitioned federated Y
 }
 
+Z1 = Y %*% seq(1, ncol(Y));
+
+while(FALSE) { }
+
+Z2 = X %*% Y;
+
 while(FALSE) { }
 
-s = X %*% Y;
+sZ1 = sum(Z1);
+
+s = sZ1 + Z2;
+
 write(s, $out_S);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedMMTestReference.dml b/src/test/scripts/functions/federated/aggregate/FederatedMMTestReference.dml
index 47f1ce0..ffaa21f 100644
--- a/src/test/scripts/functions/federated/aggregate/FederatedMMTestReference.dml
+++ b/src/test/scripts/functions/federated/aggregate/FederatedMMTestReference.dml
@@ -29,7 +29,10 @@ else {
   Y = t(X) * 7;
 }
 
-while(FALSE) { }
+Z1 = Y %*% seq(1, ncol(Y));
+
+Z2 = X %*% Y;
+
+s = sum(Z1) + Z2;
 
-s = X %*% Y;
 write(s, $5);