You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ba...@apache.org on 2020/11/14 20:15:50 UTC

[systemds] branch master updated (d61c3bf -> 914b8f8)

This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git.


    from d61c3bf  [SYSTEMDS-2724] Cast to matrix Federated
     new d000364  [SYSTEMDS-2727-9] Federated CM, Var, qsort & qpick
     new 809d53f  [MINOR] Modifications to Federated Tests
     new 914b8f8  [MINOR] Federated Modifications

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/functionsTests.yml               |   4 +-
 scripts/builtin/dist.dml                           |   4 +-
 src/main/java/org/apache/sysds/lops/MMTSJ.java     |   3 +
 .../controlprogram/federated/FederationUtils.java  | 133 ++++++++++++-
 .../fed/AggregateUnaryFEDInstruction.java          |  55 +++++-
 .../fed/BinaryMatrixScalarFEDInstruction.java      |   2 +-
 .../fed/CentralMomentFEDInstruction.java           | 187 ++++++++++++++++++
 .../runtime/instructions/fed/FEDInstruction.java   |   4 +-
 .../instructions/fed/FEDInstructionUtils.java      | 117 ++++++++----
 .../fed/MatrixIndexingFEDInstruction.java          |   4 +-
 .../fed/QuantilePickFEDInstruction.java            | 210 +++++++++++++++++++++
 .../fed/QuantileSortFEDInstruction.java            | 163 ++++++++++++++++
 .../instructions/fed/ReorgFEDInstruction.java      |   4 +-
 .../instructions/fed/TsmmFEDInstruction.java       |   4 +-
 .../sysds/runtime/privacy/PrivacyMonitor.java      |   2 +
 .../org/apache/sysds/test/AutomatedTestBase.java   |  15 +-
 .../federated/algorithms/FederatedBivarTest.java   |   9 +-
 ...eratedUnivarTest.java => FederatedCorTest.java} | 115 +++++------
 .../federated/algorithms/FederatedGLMTest.java     |  24 +--
 .../federated/algorithms/FederatedKmeansTest.java  |  58 +++---
 .../federated/algorithms/FederatedL2SVMTest.java   |   9 +-
 .../federated/algorithms/FederatedLogRegTest.java  |   7 +-
 .../federated/algorithms/FederatedPCATest.java     |  36 ++--
 .../federated/algorithms/FederatedUnivarTest.java  |  29 ++-
 .../FederatedVarTest.java}                         |  99 +++-------
 .../federated/algorithms/FederatedYL2SVMTest.java  |   2 +-
 .../federated/io/FederatedReaderTest.java          |   2 +-
 .../functions/federated/io/FederatedSSLTest.java   |   4 +-
 .../federated/io/FederatedWriterTest.java          |   4 +-
 .../paramserv/FederatedParamservTest.java          |  70 ++++---
 .../primitives/FederatedBinaryMatrixTest.java      |  18 +-
 .../primitives/FederatedBinaryVectorTest.java      |  14 +-
 ...rameTest.java => FederatedCastToFrameTest.java} |   8 +-
 ...rixTest.java => FederatedCastToMatrixTest.java} |   8 +-
 .../primitives/FederatedCentralMomentTest.java     | 147 +++++++++++++++
 ...ateTest.java => FederatedColAggregateTest.java} |  87 ++++-----
 .../primitives/FederatedConstructionTest.java      |  21 ++-
 .../primitives/FederatedFullAggregateTest.java     |  50 ++++-
 .../FederatedMatrixScalarOperationsTest.java       |   5 +-
 .../primitives/FederatedMultiplyTest.java          |   8 +-
 .../primitives/FederatedNegativeTest.java          |  28 ++-
 .../primitives/FederatedQuantileTest.java          | 165 ++++++++++++++++
 .../primitives/FederatedQuantileWeightsTest.java   | 140 ++++++++++++++
 .../federated/primitives/FederatedRCBindTest.java  |  27 ++-
 .../primitives/FederatedRightIndexTest.java        |  51 ++---
 ...ateTest.java => FederatedRowAggregateTest.java} |  86 ++++-----
 .../federated/primitives/FederatedSplitTest.java   |  19 +-
 .../primitives/FederatedStatisticsTest.java        |   8 +-
 .../federated/primitives/FederatedSumTest.java     |  20 +-
 .../TransformFederatedEncodeApplyTest.java         |   6 +-
 .../TransformFederatedEncodeDecodeTest.java        |   6 +-
 .../federated/FederatedCentralMomentTest.dml       |  14 +-
 .../FederatedCentralMomentTestReference.dml        |  12 +-
 .../functions/federated/FederatedCorTest.dml       |  21 ++-
 .../federated/FederatedCorTestReference.dml        |  13 +-
 .../functions/federated/FederatedVarTest.dml       |  20 +-
 .../federated/FederatedVarTestReference.dml        |  14 +-
 .../federated/aggregate/FederatedColVarTest.dml    |  21 ++-
 .../aggregate/FederatedColVarTestReference.dml     |  13 +-
 .../federated/aggregate/FederatedRowVarTest.dml    |  21 ++-
 .../aggregate/FederatedRowVarTestReference.dml     |  13 +-
 .../federated/aggregate/FederatedVarTest.dml       |  20 +-
 .../aggregate/FederatedVarTestReference.dml        |  13 +-
 .../federated/quantile/FederatedIQRTest.dml        |  13 +-
 .../quantile/FederatedIQRTestReference.dml         |  13 +-
 .../federated/quantile/FederatedIQRWeightsTest.dml |  14 +-
 .../quantile/FederatedIQRWeightsTestReference.dml  |  14 +-
 .../federated/quantile/FederatedMedianTest.dml     |  13 +-
 .../quantile/FederatedMedianTestReference.dml      |  13 +-
 .../quantile/FederatedMedianWeightsTest.dml        |  14 +-
 .../FederatedMedianWeightsTestReference.dml        |  14 +-
 .../federated/quantile/FederatedQuantileTest.dml   |  13 +-
 .../quantile/FederatedQuantileTestReference.dml    |  13 +-
 .../quantile/FederatedQuantileWeightsTest.dml      |  14 +-
 .../FederatedQuantileWeightsTestReference.dml      |  14 +-
 .../federated/quantile/FederatedQuantilesTest.dml  |  16 +-
 .../quantile/FederatedQuantilesTestReference.dml   |  16 +-
 .../quantile/FederatedQuantilesWeightsTest.dml     |  17 +-
 .../FederatedQuantilesWeightsTestReference.dml     |  17 +-
 79 files changed, 1966 insertions(+), 758 deletions(-)
 create mode 100644 src/main/java/org/apache/sysds/runtime/instructions/fed/CentralMomentFEDInstruction.java
 create mode 100644 src/main/java/org/apache/sysds/runtime/instructions/fed/QuantilePickFEDInstruction.java
 create mode 100644 src/main/java/org/apache/sysds/runtime/instructions/fed/QuantileSortFEDInstruction.java
 copy src/test/java/org/apache/sysds/test/functions/federated/algorithms/{FederatedUnivarTest.java => FederatedCorTest.java} (61%)
 copy src/test/java/org/apache/sysds/test/functions/federated/{primitives/FederatedRightIndexTest.java => algorithms/FederatedVarTest.java} (62%)
 rename src/test/java/org/apache/sysds/test/functions/federated/primitives/{FederetedCastToFrameTest.java => FederatedCastToFrameTest.java} (94%)
 rename src/test/java/org/apache/sysds/test/functions/federated/primitives/{FederetedCastToMatrixTest.java => FederatedCastToMatrixTest.java} (95%)
 create mode 100644 src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedCentralMomentTest.java
 copy src/test/java/org/apache/sysds/test/functions/federated/primitives/{FederatedRowColAggregateTest.java => FederatedColAggregateTest.java} (69%)
 create mode 100644 src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedQuantileTest.java
 create mode 100644 src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedQuantileWeightsTest.java
 rename src/test/java/org/apache/sysds/test/functions/federated/primitives/{FederatedRowColAggregateTest.java => FederatedRowAggregateTest.java} (69%)
 copy scripts/builtin/dist.dml => src/test/scripts/functions/federated/FederatedCentralMomentTest.dml (70%)
 copy scripts/builtin/dist.dml => src/test/scripts/functions/federated/FederatedCentralMomentTestReference.dml (70%)
 copy scripts/builtin/dist.dml => src/test/scripts/functions/federated/FederatedCorTest.dml (57%)
 copy scripts/builtin/dist.dml => src/test/scripts/functions/federated/FederatedCorTestReference.dml (70%)
 copy scripts/builtin/dist.dml => src/test/scripts/functions/federated/FederatedVarTest.dml (57%)
 copy scripts/builtin/dist.dml => src/test/scripts/functions/federated/FederatedVarTestReference.dml (70%)
 copy scripts/builtin/dist.dml => src/test/scripts/functions/federated/aggregate/FederatedColVarTest.dml (57%)
 copy scripts/builtin/dist.dml => src/test/scripts/functions/federated/aggregate/FederatedColVarTestReference.dml (70%)
 copy scripts/builtin/dist.dml => src/test/scripts/functions/federated/aggregate/FederatedRowVarTest.dml (57%)
 copy scripts/builtin/dist.dml => src/test/scripts/functions/federated/aggregate/FederatedRowVarTestReference.dml (70%)
 copy scripts/builtin/dist.dml => src/test/scripts/functions/federated/aggregate/FederatedVarTest.dml (57%)
 copy scripts/builtin/dist.dml => src/test/scripts/functions/federated/aggregate/FederatedVarTestReference.dml (70%)
 copy scripts/builtin/dist.dml => src/test/scripts/functions/federated/quantile/FederatedIQRTest.dml (70%)
 copy scripts/builtin/dist.dml => src/test/scripts/functions/federated/quantile/FederatedIQRTestReference.dml (70%)
 copy scripts/builtin/dist.dml => src/test/scripts/functions/federated/quantile/FederatedIQRWeightsTest.dml (70%)
 copy scripts/builtin/dist.dml => src/test/scripts/functions/federated/quantile/FederatedIQRWeightsTestReference.dml (70%)
 copy scripts/builtin/dist.dml => src/test/scripts/functions/federated/quantile/FederatedMedianTest.dml (70%)
 copy scripts/builtin/dist.dml => src/test/scripts/functions/federated/quantile/FederatedMedianTestReference.dml (70%)
 copy scripts/builtin/dist.dml => src/test/scripts/functions/federated/quantile/FederatedMedianWeightsTest.dml (70%)
 copy scripts/builtin/dist.dml => src/test/scripts/functions/federated/quantile/FederatedMedianWeightsTestReference.dml (70%)
 copy scripts/builtin/dist.dml => src/test/scripts/functions/federated/quantile/FederatedQuantileTest.dml (70%)
 copy scripts/builtin/dist.dml => src/test/scripts/functions/federated/quantile/FederatedQuantileTestReference.dml (70%)
 copy scripts/builtin/dist.dml => src/test/scripts/functions/federated/quantile/FederatedQuantileWeightsTest.dml (70%)
 copy scripts/builtin/dist.dml => src/test/scripts/functions/federated/quantile/FederatedQuantileWeightsTestReference.dml (70%)
 copy scripts/builtin/dist.dml => src/test/scripts/functions/federated/quantile/FederatedQuantilesTest.dml (70%)
 copy scripts/builtin/dist.dml => src/test/scripts/functions/federated/quantile/FederatedQuantilesTestReference.dml (70%)
 copy scripts/builtin/dist.dml => src/test/scripts/functions/federated/quantile/FederatedQuantilesWeightsTest.dml (70%)
 copy scripts/builtin/dist.dml => src/test/scripts/functions/federated/quantile/FederatedQuantilesWeightsTestReference.dml (70%)


[systemds] 01/03: [SYSTEMDS-2727-9] Federated CM, Var, qsort & qpick

Posted by ba...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit d000364e972c6ab2816e0a4990c7ab9c736042c8
Author: Olga <ov...@gmail.com>
AuthorDate: Wed Aug 26 14:44:04 2020 +0200

    [SYSTEMDS-2727-9] Federated CM, Var, qsort & qpick
    
    This commit adds more primitive federated instructions for statistics
    calculations.
    
    - Correlation
    - Variance,
    - Quantile sort and
    - Quantile pick.
    
    closes #1103
    closes #1102
---
 scripts/builtin/dist.dml                           |   5 +-
 src/main/java/org/apache/sysds/lops/MMTSJ.java     |   3 +
 .../controlprogram/federated/FederationUtils.java  | 163 +++++++++++++++-
 .../fed/AggregateUnaryFEDInstruction.java          |  55 +++++-
 .../fed/BinaryMatrixScalarFEDInstruction.java      |   2 +-
 .../fed/CentralMomentFEDInstruction.java           | 187 ++++++++++++++++++
 .../runtime/instructions/fed/FEDInstruction.java   |   4 +-
 .../instructions/fed/FEDInstructionUtils.java      | 120 ++++++++----
 .../fed/QuantilePickFEDInstruction.java            | 210 +++++++++++++++++++++
 .../fed/QuantileSortFEDInstruction.java            | 163 ++++++++++++++++
 .../instructions/fed/ReorgFEDInstruction.java      |   4 +-
 .../instructions/fed/TsmmFEDInstruction.java       |   4 +-
 .../federated/algorithms/FederatedBivarTest.java   |   3 +-
 ...eratedUnivarTest.java => FederatedCorTest.java} | 106 ++++++-----
 .../federated/algorithms/FederatedGLMTest.java     |  22 +--
 .../federated/algorithms/FederatedKmeansTest.java  |  42 ++---
 .../federated/algorithms/FederatedL2SVMTest.java   |   7 +-
 .../federated/algorithms/FederatedLogRegTest.java  |   5 +-
 .../federated/algorithms/FederatedPCATest.java     |  30 ++-
 .../federated/algorithms/FederatedUnivarTest.java  |  23 +--
 .../FederatedVarTest.java}                         |  97 +++-------
 .../functions/federated/io/FederatedSSLTest.java   |   2 +-
 .../federated/io/FederatedWriterTest.java          |   2 +-
 .../paramserv/FederatedParamservTest.java          |   7 +-
 .../primitives/FederatedBinaryMatrixTest.java      |  16 +-
 .../primitives/FederatedBinaryVectorTest.java      |  12 +-
 ...rameTest.java => FederatedCastToFrameTest.java} |   6 +-
 ...rixTest.java => FederatedCastToMatrixTest.java} |   6 +-
 .../primitives/FederatedCentralMomentTest.java     | 147 +++++++++++++++
 .../primitives/FederatedConstructionTest.java      |  21 ++-
 .../primitives/FederatedFullAggregateTest.java     |  13 +-
 .../FederatedMatrixScalarOperationsTest.java       |   5 +-
 .../primitives/FederatedMultiplyTest.java          |   6 +-
 .../primitives/FederatedQuantileTest.java          | 165 ++++++++++++++++
 .../primitives/FederatedQuantileWeightsTest.java   | 140 ++++++++++++++
 .../federated/primitives/FederatedRCBindTest.java  |  23 ++-
 .../primitives/FederatedRightIndexTest.java        |  16 +-
 .../primitives/FederatedRowColAggregateTest.java   |   2 +-
 .../federated/primitives/FederatedSplitTest.java   |  15 +-
 .../primitives/FederatedStatisticsTest.java        |   6 +-
 .../federated/primitives/FederatedSumTest.java     |  20 +-
 .../federated/FederatedCentralMomentTest.dml       |  14 +-
 .../FederatedCentralMomentTestReference.dml        |  12 +-
 .../functions/federated/FederatedCorTest.dml       |  21 ++-
 .../federated/FederatedCorTestReference.dml        |  13 +-
 .../functions/federated/FederatedVarTest.dml       |  20 +-
 .../federated/FederatedVarTestReference.dml        |  14 +-
 .../federated/quantile/FederatedIQRTest.dml        |  13 +-
 .../quantile/FederatedIQRTestReference.dml         |  13 +-
 .../federated/quantile/FederatedIQRWeightsTest.dml |  14 +-
 .../quantile/FederatedIQRWeightsTestReference.dml  |  14 +-
 .../federated/quantile/FederatedMedianTest.dml     |  13 +-
 .../quantile/FederatedMedianTestReference.dml      |  13 +-
 .../quantile/FederatedMedianWeightsTest.dml        |  14 +-
 .../FederatedMedianWeightsTestReference.dml        |  14 +-
 .../federated/quantile/FederatedQuantileTest.dml   |  13 +-
 .../quantile/FederatedQuantileTestReference.dml    |  13 +-
 .../quantile/FederatedQuantileWeightsTest.dml      |  14 +-
 .../FederatedQuantileWeightsTestReference.dml      |  14 +-
 .../federated/quantile/FederatedQuantilesTest.dml  |  16 +-
 .../quantile/FederatedQuantilesTestReference.dml   |  16 +-
 .../quantile/FederatedQuantilesWeightsTest.dml     |  17 +-
 .../FederatedQuantilesWeightsTestReference.dml     |  17 +-
 63 files changed, 1712 insertions(+), 495 deletions(-)

diff --git a/scripts/builtin/dist.dml b/scripts/builtin/dist.dml
index 9c473d8..e5fe930 100644
--- a/scripts/builtin/dist.dml
+++ b/scripts/builtin/dist.dml
@@ -23,7 +23,8 @@
 m_dist = function(Matrix[Double] X) return (Matrix[Double] Y) {
     G = X %*% t(X);
     I = matrix(1, rows = nrow(G), cols = ncol(G));
-    Y = -2 * (G) + t(I %*% diag(diag(G))) + t(diag(diag(G)) %*% I);
+    Y = -2 * (G) + (diag(G) * I) + (I * t(diag(G)));
+#   Y = -2 * (G) + t(I %*% diag(diag(G))) + t(diag(diag(G)) %*% I);
     Y = sqrt(Y);
     Y = replace(target = Y, pattern=0/0, replacement = 0);
-}
\ No newline at end of file
+}
diff --git a/src/main/java/org/apache/sysds/lops/MMTSJ.java b/src/main/java/org/apache/sysds/lops/MMTSJ.java
index 50448d1..73e27d8 100644
--- a/src/main/java/org/apache/sysds/lops/MMTSJ.java
+++ b/src/main/java/org/apache/sysds/lops/MMTSJ.java
@@ -41,6 +41,9 @@ public class MMTSJ extends Lop
 		public boolean isLeft(){
 			return (this == LEFT);
 		}
+		public boolean isRight(){
+			return (this == RIGHT);
+		}
 	}
 	
 	private MMTSJType _type = null;
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
index 37cb7d5..0a24cea 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
@@ -35,6 +35,7 @@ import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.Reques
 import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence;
 import org.apache.sysds.runtime.functionobjects.Builtin;
 import org.apache.sysds.runtime.functionobjects.Builtin.BuiltinCode;
+import org.apache.sysds.runtime.functionobjects.CM;
 import org.apache.sysds.runtime.functionobjects.KahanFunction;
 import org.apache.sysds.runtime.functionobjects.Mean;
 import org.apache.sysds.runtime.functionobjects.Plus;
@@ -169,11 +170,85 @@ public class FederationUtils {
 		}
 	}
 
-	public static ScalarObject aggScalar(AggregateUnaryOperator aop, Future<FederatedResponse>[] ffr, FederationMap map) {
-		if(!(aop.aggOp.increOp.fn instanceof KahanFunction || (aop.aggOp.increOp.fn instanceof Builtin &&
-			(((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN
-			|| ((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX)
-			|| aop.aggOp.increOp.fn instanceof Mean ))) {
+	public static MatrixBlock aggVar(Future<FederatedResponse>[] ffr, Future<FederatedResponse>[] meanFfr, FederationMap map, boolean isRowAggregate, boolean isScalar) {
+		try {
+//			else if(aop.aggOp.increOp.fn instanceof CM) {
+//				double var = ((ScalarObject) ffr[0].get().getData()[0]).getDoubleValue();
+//				double mean = ((ScalarObject) meanFfr[0].get().getData()[0]).getDoubleValue();
+//				long size = map.getFederatedRanges()[0].getSize();
+//				for(int i = 0; i < ffr.length - 1; i++) {
+//					long l = size + map.getFederatedRanges()[i+1].getSize();
+//					double k = ((size * var) + (map.getFederatedRanges()[i+1].getSize() * ((ScalarObject) ffr[i+1].get().getData()[0]).getDoubleValue())) / l;
+//					var = k + (size * map.getFederatedRanges()[i+1].getSize()) * Math.pow((mean - ((ScalarObject) meanFfr[i+1].get().getData()[0]).getDoubleValue()) / l, 2);
+//					mean = (mean *  size + ((ScalarObject) meanFfr[i+1].get().getData()[0]).getDoubleValue() * (map.getFederatedRanges()[i+1].getSize())) / l;
+//					size = l;
+//					System.out.println("Olga");
+//					//					long l = sizes[i] + sizes[i + 1];
+//					//					double k = Math.pow(means[i] - means[i+1], 2) * (sizes[i] * sizes[i+1]);
+//					//					k += ((sizes[i] * vars[i]) + (sizes[i+1] * vars[i+1])) * l;
+//					//					vars[i+1] = k / Math.pow(l, 2);
+//					//
+//					//					means[i+1] = (means[i] * sizes[i] + means[i] * sizes[i]) / l;
+//					//					sizes[i+1] = l;
+//				}
+//				return new DoubleObject(var);
+//
+//			}
+
+
+			FederatedRange[] ranges = map.getFederatedRanges();
+			BinaryOperator plus = InstructionUtils.parseBinaryOperator("+");
+			BinaryOperator minus = InstructionUtils.parseBinaryOperator("-");
+
+			ScalarOperator mult1 = InstructionUtils.parseScalarBinaryOperator("*", false);
+			ScalarOperator dev1 = InstructionUtils.parseScalarBinaryOperator("/", false);
+			ScalarOperator pow = InstructionUtils.parseScalarBinaryOperator("^2", false);
+
+			long size1 = isScalar ? ranges[0].getSize() : ranges[0].getSize(isRowAggregate ? 0 : 1);
+			MatrixBlock var1 = (MatrixBlock)ffr[0].get().getData()[0];
+			MatrixBlock mean1 = (MatrixBlock)meanFfr[0].get().getData()[0];
+			for(int i=0; i < ffr.length - 1; i++) {
+				MatrixBlock var2 = (MatrixBlock)ffr[i+1].get().getData()[0];
+				MatrixBlock mean2 = (MatrixBlock)meanFfr[i+1].get().getData()[0];
+				long size2 = isScalar ? ranges[i+1].getSize() : ranges[i+1].getSize(isRowAggregate ? 0 : 1);
+
+				mult1 = mult1.setConstant(size1);
+				var1 = var1.scalarOperations(mult1, new MatrixBlock());
+				mult1 = mult1.setConstant(size2);
+				var1 = var1.binaryOperationsInPlace(plus, var2.scalarOperations(mult1, new MatrixBlock()));
+				dev1 = dev1.setConstant(size1 + size2);
+				var1 = var1.scalarOperations(dev1, new MatrixBlock());
+
+				MatrixBlock tmp1 = (mean1.binaryOperationsInPlace(minus, mean2)).scalarOperations(dev1, new MatrixBlock());
+				tmp1 = tmp1.scalarOperations(pow, new MatrixBlock());
+				mult1 = mult1.setConstant(size1*size2);
+				tmp1 = tmp1.scalarOperations(mult1, new MatrixBlock());
+
+				var1 = tmp1.binaryOperationsInPlace(plus, var1);
+
+				// next mean
+				mult1 = mult1.setConstant(size1);
+				tmp1 = mean1.scalarOperations(mult1, new MatrixBlock());
+				mult1 = mult1.setConstant(size2);
+				mean1 = tmp1.binaryOperationsInPlace(plus, mean2.scalarOperations(mult1, new MatrixBlock()));
+				mean1 = mean1.scalarOperations(dev1, new MatrixBlock());
+
+				size1 = size1 + size2;
+			}
+
+			return var1;
+		}
+		catch (Exception ex) {
+			throw new DMLRuntimeException(ex);
+		}
+	}
+
+	public static ScalarObject aggScalar(AggregateUnaryOperator aop, Future<FederatedResponse>[] ffr, Future<FederatedResponse>[] meanFfr, FederationMap map) {
+		if(!(aop.aggOp.increOp.fn instanceof KahanFunction || aop.aggOp.increOp.fn instanceof CM ||
+			(aop.aggOp.increOp.fn instanceof Builtin &&
+				(((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN ||
+				((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX)
+				|| aop.aggOp.increOp.fn instanceof Mean))) {
 			throw new DMLRuntimeException("Unsupported aggregation operator: "
 				+ aop.aggOp.increOp.getClass().getSimpleName());
 		}
@@ -187,6 +262,27 @@ public class FederationUtils {
 			else if( aop.aggOp.increOp.fn instanceof Mean ) {
 				return new DoubleObject(aggMean(ffr, map).getValue(0,0));
 			}
+			else if(aop.aggOp.increOp.fn instanceof CM) {
+				double var = ((ScalarObject) ffr[0].get().getData()[0]).getDoubleValue();
+				double mean = ((ScalarObject) meanFfr[0].get().getData()[0]).getDoubleValue();
+				long size = map.getFederatedRanges()[0].getSize();
+				for(int i = 0; i < ffr.length - 1; i++) {
+					long l = size + map.getFederatedRanges()[i+1].getSize();
+					double k = ((size * var) + (map.getFederatedRanges()[i+1].getSize() * ((ScalarObject) ffr[i+1].get().getData()[0]).getDoubleValue())) / l;
+					var = k + (size * map.getFederatedRanges()[i+1].getSize()) * Math.pow((mean - ((ScalarObject) meanFfr[i+1].get().getData()[0]).getDoubleValue()) / l, 2);
+					mean = (mean *  size + ((ScalarObject) meanFfr[i+1].get().getData()[0]).getDoubleValue() * (map.getFederatedRanges()[i+1].getSize())) / l;
+					size = l;
+//					long l = sizes[i] + sizes[i + 1];
+//					double k = Math.pow(means[i] - means[i+1], 2) * (sizes[i] * sizes[i+1]);
+//					k += ((sizes[i] * vars[i]) + (sizes[i+1] * vars[i+1])) * l;
+//					vars[i+1] = k / Math.pow(l, 2);
+//
+//					means[i+1] = (means[i] * sizes[i] + means[i] * sizes[i]) / l;
+//					sizes[i+1] = l;
+				}
+				return new DoubleObject(var);
+
+			}
 			else { //if (aop.aggOp.increOp.fn instanceof KahanFunction)
 				double sum = 0; //uak+
 				for( Future<FederatedResponse> fr : ffr )
@@ -199,7 +295,7 @@ public class FederationUtils {
 		}
 	}
 
-	public static MatrixBlock aggMatrix(AggregateUnaryOperator aop, Future<FederatedResponse>[] ffr, FederationMap map) {
+	public static MatrixBlock aggMatrix(AggregateUnaryOperator aop, Future<FederatedResponse>[] ffr, Future<FederatedResponse>[] meanFfr, FederationMap map) {
 		if (aop.isRowAggregate() && map.getType() == FederationMap.FType.ROW)
 			return bind(ffr, false);
 		else if (aop.isColAggregate() && map.getType() == FederationMap.FType.COL)
@@ -214,7 +310,10 @@ public class FederationUtils {
 				((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX)) {
 			boolean isMin = ((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN;
 			return aggMinMax(ffr,isMin,false, Optional.of(map.getType()));
-		} else
+		} else if(aop.aggOp.increOp.fn instanceof CM) {
+			return aggVar(ffr, meanFfr, map, aop.isRowAggregate(), !(aop.isColAggregate() && aop.isRowAggregate())); //TODO
+		}
+		else
 			throw new DMLRuntimeException("Unsupported aggregation operator: "
 				+ aop.aggOp.increOp.fn.getClass().getSimpleName());
 	}
@@ -229,6 +328,56 @@ public class FederationUtils {
 		}
 	}
 
+	public static ScalarObject aggScalar(AggregateUnaryOperator aop, Future<FederatedResponse>[] ffr, FederationMap map) {
+		if(!(aop.aggOp.increOp.fn instanceof KahanFunction || (aop.aggOp.increOp.fn instanceof Builtin &&
+			(((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN
+			|| ((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX)
+			|| aop.aggOp.increOp.fn instanceof Mean ))) {
+			throw new DMLRuntimeException("Unsupported aggregation operator: "
+				+ aop.aggOp.increOp.getClass().getSimpleName());
+		}
+
+		try {
+			if(aop.aggOp.increOp.fn instanceof Builtin){
+				// then we know it is a Min or Max based on the previous check.
+				boolean isMin = ((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN;
+				return new DoubleObject(aggMinMax(ffr, isMin, true,  Optional.empty()).getValue(0,0));
+			}
+			else if( aop.aggOp.increOp.fn instanceof Mean ) {
+				return new DoubleObject(aggMean(ffr, map).getValue(0,0));
+			}
+			else { //if (aop.aggOp.increOp.fn instanceof KahanFunction)
+				double sum = 0; //uak+
+				for( Future<FederatedResponse> fr : ffr )
+					sum += ((ScalarObject)fr.get().getData()[0]).getDoubleValue();
+				return new DoubleObject(sum);
+			}
+		}
+		catch(Exception ex) {
+			throw new DMLRuntimeException(ex);
+		}
+	}
+
+	public static MatrixBlock aggMatrix(AggregateUnaryOperator aop, Future<FederatedResponse>[] ffr, FederationMap map) {
+		if (aop.isRowAggregate() && map.getType() == FederationMap.FType.ROW)
+			return bind(ffr, false);
+		else if (aop.isColAggregate() && map.getType() == FederationMap.FType.COL)
+			return bind(ffr, true);
+
+		if (aop.aggOp.increOp.fn instanceof KahanFunction)
+			return aggAdd(ffr);
+		else if( aop.aggOp.increOp.fn instanceof Mean )
+			return aggMean(ffr, map);
+		else if (aop.aggOp.increOp.fn instanceof Builtin &&
+			(((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN ||
+				((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX)) {
+			boolean isMin = ((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN;
+			return aggMinMax(ffr,isMin,false, Optional.of(map.getType()));
+		} else
+			throw new DMLRuntimeException("Unsupported aggregation operator: "
+				+ aop.aggOp.increOp.fn.getClass().getSimpleName());
+	}
+
 	public static FederationMap federateLocalData(CacheableData<?> data) {
 		long id = FederationUtils.getNextFedDataID();
 		FederatedLocalData federatedLocalData = new FederatedLocalData(id, data);
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateUnaryFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateUnaryFEDInstruction.java
index d06dfaa..1429dd3 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateUnaryFEDInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateUnaryFEDInstruction.java
@@ -25,13 +25,14 @@ import org.apache.sysds.lops.LopProperties.ExecType;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.RequestType;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
 import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
 import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
-import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.RequestType;
 import org.apache.sysds.runtime.instructions.InstructionUtils;
 import org.apache.sysds.runtime.instructions.cp.CPOperand;
 import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
+import org.apache.sysds.runtime.matrix.operators.Operator;
 
 public class AggregateUnaryFEDInstruction extends UnaryFEDInstruction {
 	
@@ -39,7 +40,17 @@ public class AggregateUnaryFEDInstruction extends UnaryFEDInstruction {
 			CPOperand out, String opcode, String istr) {
 		super(FEDType.AggregateUnary, auop, in, out, opcode, istr);
 	}
-	
+
+	protected AggregateUnaryFEDInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand out,
+										   String opcode, String istr) {
+		super(FEDType.AggregateUnary, op, in1, in2, out, opcode, istr);
+	}
+
+	protected AggregateUnaryFEDInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand in3, CPOperand out,
+										   String opcode, String istr) {
+		super(FEDType.AggregateUnary, op, in1, in2, in3, out, opcode, istr);
+	}
+
 	public static AggregateUnaryFEDInstruction parseInstruction(String str) {
 		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
 		String opcode = parts[0];
@@ -53,6 +64,15 @@ public class AggregateUnaryFEDInstruction extends UnaryFEDInstruction {
 	
 	@Override
 	public void processInstruction(ExecutionContext ec) {
+		if (getOpcode().contains("var")) {
+			processVar(ec);
+		}else{
+			processDefault(ec);
+		}
+
+	}
+
+	private void processDefault(ExecutionContext ec){
 		AggregateUnaryOperator aop = (AggregateUnaryOperator) _optr;
 		MatrixObject in = ec.getMatrixObject(input1);
 		FederationMap map = in.getFedMapping();
@@ -70,4 +90,35 @@ public class AggregateUnaryFEDInstruction extends UnaryFEDInstruction {
 		else
 			ec.setMatrixOutput(output.getName(), FederationUtils.aggMatrix(aop, tmp, map));
 	}
+
+	private void processVar(ExecutionContext ec){
+		AggregateUnaryOperator aop = (AggregateUnaryOperator) _optr;
+		MatrixObject in = ec.getMatrixObject(input1);
+		FederationMap map = in.getFedMapping();
+
+		// federated ranges mean for variance
+		Future<FederatedResponse>[] meanTmp = null;
+		if (getOpcode().contains("var")) {
+			String meanInstr = instString.replace(getOpcode(), getOpcode().replace("var", "mean"));
+			//create federated commands for aggregation
+			FederatedRequest meanFr1 = FederationUtils.callInstruction(meanInstr, output,
+				new CPOperand[]{input1}, new long[]{in.getFedMapping().getID()});
+			FederatedRequest meanFr2 = new FederatedRequest(RequestType.GET_VAR, meanFr1.getID());
+			FederatedRequest meanFr3 = map.cleanup(getTID(), meanFr1.getID());
+			meanTmp = map.execute(getTID(), meanFr1, meanFr2, meanFr3);
+		}
+
+		//create federated commands for aggregation
+		FederatedRequest fr1 = FederationUtils.callInstruction(instString, output,
+			new CPOperand[]{input1}, new long[]{in.getFedMapping().getID()});
+		FederatedRequest fr2 = new FederatedRequest(RequestType.GET_VAR, fr1.getID());
+		FederatedRequest fr3 = map.cleanup(getTID(), fr1.getID());
+		
+		//execute federated commands and cleanups
+		Future<FederatedResponse>[] tmp = map.execute(getTID(), fr1, fr2, fr3);
+		if( output.isScalar() )
+			ec.setVariable(output.getName(), FederationUtils.aggScalar(aop, tmp, meanTmp, map));
+		else
+			ec.setMatrixOutput(output.getName(), FederationUtils.aggMatrix(aop, meanTmp, tmp, map));
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/BinaryMatrixScalarFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/BinaryMatrixScalarFEDInstruction.java
index b6ea1fb..895db4a 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/BinaryMatrixScalarFEDInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/BinaryMatrixScalarFEDInstruction.java
@@ -38,7 +38,7 @@ public class BinaryMatrixScalarFEDInstruction extends BinaryFEDInstruction
 		CPOperand matrix = input1.isMatrix() ? input1 : input2;
 		CPOperand scalar = input2.isScalar() ? input2 : input1;
 		MatrixObject mo = ec.getMatrixObject(matrix);
-		
+
 		//prepare federated request matrix-scalar
 		FederatedRequest fr1 = !scalar.isLiteral() ?
 			mo.getFedMapping().broadcast(ec.getScalarInput(scalar)) : null;
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/CentralMomentFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/CentralMomentFEDInstruction.java
new file mode 100644
index 0000000..cc8e683
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/CentralMomentFEDInstruction.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.fed;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedUDF;
+import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
+import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
+import org.apache.sysds.runtime.functionobjects.CM;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CM_COV_Object;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.instructions.cp.Data;
+import org.apache.sysds.runtime.instructions.cp.DoubleObject;
+import org.apache.sysds.runtime.instructions.cp.ScalarObject;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.operators.CMOperator;
+
+public class CentralMomentFEDInstruction extends AggregateUnaryFEDInstruction {
+
+    private CentralMomentFEDInstruction(CMOperator cm, CPOperand in1, CPOperand in2, CPOperand in3, CPOperand out,
+                                       String opcode, String str) {
+        super(cm, in1, in2, in3, out, opcode, str);
+    }
+
+    public static CentralMomentFEDInstruction parseInstruction(String str) {
+        CPOperand in1 = new CPOperand("", Types.ValueType.UNKNOWN, Types.DataType.UNKNOWN);
+        CPOperand in2 = null;
+        CPOperand in3 = null;
+        CPOperand out = new CPOperand("", Types.ValueType.UNKNOWN, Types.DataType.UNKNOWN);
+
+        String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
+        String opcode = parts[0];
+
+        //check supported opcode
+        if( !opcode.equalsIgnoreCase("cm") ) {
+            throw new DMLRuntimeException("Unsupported opcode "+opcode);
+        }
+
+        if ( parts.length == 4 ) {
+            // Example: CP.cm.mVar0.Var1.mVar2; (without weights)
+            in2 = new CPOperand("", Types.ValueType.UNKNOWN, Types.DataType.UNKNOWN);
+            parseUnaryInstruction(str, in1, in2, out);
+        }
+        else if ( parts.length == 5) {
+            // CP.cm.mVar0.mVar1.Var2.mVar3; (with weights)
+            in2 = new CPOperand("", Types.ValueType.UNKNOWN, Types.DataType.UNKNOWN);
+            in3 = new CPOperand("", Types.ValueType.UNKNOWN, Types.DataType.UNKNOWN);
+            parseUnaryInstruction(str, in1, in2, in3, out);
+        }
+
+        /*
+         * Exact order of the central moment MAY NOT be known at compilation time.
+         * We first try to parse the second argument as an integer, and if we fail,
+         * we simply pass -1 so that getCMAggOpType() picks up AggregateOperationTypes.INVALID.
+         * It must be updated at run time in processInstruction() method.
+         */
+
+        int cmOrder;
+        try {
+            if ( in3 == null ) {
+                cmOrder = Integer.parseInt(in2.getName());
+            }
+            else {
+                cmOrder = Integer.parseInt(in3.getName());
+            }
+        } catch(NumberFormatException e) {
+            cmOrder = -1; // unknown at compilation time
+        }
+
+        CMOperator.AggregateOperationTypes opType = CMOperator.getCMAggOpType(cmOrder);
+        CMOperator cm = new CMOperator(CM.getCMFnObject(opType), opType);
+        return new CentralMomentFEDInstruction(cm, in1, in2, in3, out, opcode, str);
+    }
+
+    @Override
+    public void processInstruction( ExecutionContext ec ) {
+        MatrixObject mo = ec.getMatrixObject(input1.getName());
+        ScalarObject order = ec.getScalarInput(input3==null ? input2 : input3);
+
+        CMOperator cm_op = ((CMOperator) _optr);
+        if(cm_op.getAggOpType() == CMOperator.AggregateOperationTypes.INVALID)
+            cm_op = cm_op.setCMAggOp((int) order.getLongValue());
+
+        FederationMap fedMapping = mo.getFedMapping();
+        List<CM_COV_Object> globalCmobj = new ArrayList<>();
+
+        long varID = FederationUtils.getNextFedDataID();
+        CMOperator finalCm_op = cm_op;
+        fedMapping.mapParallel(varID, (range, data) -> {
+
+            FederatedResponse response;
+            try {
+                if (input3 == null ) {
+                    response = data.executeFederatedOperation(
+                        new FederatedRequest(FederatedRequest.RequestType.EXEC_UDF, -1,
+                            new CentralMomentFEDInstruction.CMFunction(data.getVarID(), finalCm_op))).get();
+                } else {
+                    MatrixBlock wtBlock = ec.getMatrixInput(input2.getName());
+
+                    response = data.executeFederatedOperation(
+                        new FederatedRequest(FederatedRequest.RequestType.EXEC_UDF, -1,
+                            new CentralMomentFEDInstruction.CMWeightsFunction(data.getVarID(), finalCm_op, wtBlock))).get();
+                }
+
+                if(!response.isSuccessful())
+                    response.throwExceptionFromResponse();
+               synchronized(globalCmobj) {
+                   globalCmobj.add((CM_COV_Object) response.getData()[0]);
+               }
+            }
+            catch(Exception e) {
+                throw new DMLRuntimeException(e);
+            }
+            return null;
+        });
+
+        Optional<CM_COV_Object> res = globalCmobj.stream().reduce((arg0, arg1) -> (CM_COV_Object) finalCm_op.fn.execute(arg0, arg1));
+        try {
+            ec.setScalarOutput(output.getName(), new DoubleObject(res.get().getRequiredResult(finalCm_op)));
+        }
+        catch(Exception e) {
+            throw new DMLRuntimeException(e);
+        }
+    }
+
+    private static class CMFunction extends FederatedUDF {
+        private static final long serialVersionUID = 7460149207607220994L;
+        private final CMOperator _op;
+
+        public CMFunction (long input, CMOperator op) {
+            super(new long[] {input});
+            _op = op;
+        }
+
+        @Override
+        public FederatedResponse execute(ExecutionContext ec, Data... data) {
+            MatrixBlock mb = ((MatrixObject) data[0]).acquireReadAndRelease();
+            return new FederatedResponse(FederatedResponse.ResponseType.SUCCESS, mb.cmOperations(_op));
+        }
+    }
+
+
+    private static class CMWeightsFunction extends FederatedUDF {
+        private static final long serialVersionUID = -3685746246551622021L;
+        private final CMOperator _op;
+        private final MatrixBlock _weights;
+
+        protected CMWeightsFunction(long input, CMOperator op, MatrixBlock weights) {
+            super(new long[] {input});
+            _op = op;
+            _weights = weights;
+        }
+
+        @Override
+        public FederatedResponse execute(ExecutionContext ec, Data... data) {
+            MatrixBlock mb = ((MatrixObject) data[0]).acquireReadAndRelease();
+            return new FederatedResponse(FederatedResponse.ResponseType.SUCCESS, mb.cmOperations(_op, _weights));
+        }
+    }
+}
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstruction.java
index 8094c96..0d912d8 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstruction.java
@@ -37,7 +37,9 @@ public abstract class FEDInstruction extends Instruction {
 		Tsmm,
 		MMChain,
 		Reorg,
-		MatrixIndexing
+		MatrixIndexing,
+		QSort,
+		QPick
 	}
 	
 	protected final FEDType _fedType;
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
index ef66b66..1b095e1 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
@@ -22,6 +22,7 @@ package org.apache.sysds.runtime.instructions.fed;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
+import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysds.runtime.controlprogram.federated.FederationMap.FType;
@@ -30,18 +31,25 @@ import org.apache.sysds.runtime.instructions.cp.AggregateBinaryCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.AggregateUnaryCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.BinaryCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.Data;
+import org.apache.sysds.runtime.instructions.cp.IndexingCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.MMChainCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.MMTSJCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.MatrixIndexingCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.MultiReturnParameterizedBuiltinCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.ParameterizedBuiltinCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.ReorgCPInstruction;
+import org.apache.sysds.runtime.instructions.cp.UnaryCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.VariableCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.VariableCPInstruction.VariableOperationCode;
 import org.apache.sysds.runtime.instructions.spark.AggregateUnarySPInstruction;
 import org.apache.sysds.runtime.instructions.spark.AppendGAlignedSPInstruction;
 import org.apache.sysds.runtime.instructions.spark.AppendGSPInstruction;
+import org.apache.sysds.runtime.instructions.spark.BinarySPInstruction;
+import org.apache.sysds.runtime.instructions.spark.CentralMomentSPInstruction;
 import org.apache.sysds.runtime.instructions.spark.MapmmSPInstruction;
+import org.apache.sysds.runtime.instructions.spark.QuantilePickSPInstruction;
+import org.apache.sysds.runtime.instructions.spark.QuantileSortSPInstruction;
+import org.apache.sysds.runtime.instructions.spark.UnarySPInstruction;
 import org.apache.sysds.runtime.instructions.spark.WriteSPInstruction;
 
 public class FEDInstructionUtils {
@@ -53,9 +61,9 @@ public class FEDInstructionUtils {
 
 	/**
 	 * Check and replace CP instructions with federated instructions if the instruction match criteria.
-	 * 
+	 *
 	 * @param inst The instruction to analyse
-	 * @param ec The Execution Context 
+	 * @param ec The Execution Context
 	 * @return The potentially modified instruction
 	 */
 	public static Instruction checkAndReplaceCP(Instruction inst, ExecutionContext ec) {
@@ -82,14 +90,32 @@ public class FEDInstructionUtils {
 			if( mo.isFederated() )
 				fedinst = TsmmFEDInstruction.parseInstruction(linst.getInstructionString());
 		}
-		else if (inst instanceof AggregateUnaryCPInstruction) {
-			AggregateUnaryCPInstruction instruction = (AggregateUnaryCPInstruction) inst;
-			if( instruction.input1.isMatrix() && ec.containsVariable(instruction.input1) ) {
+		else if (inst instanceof UnaryCPInstruction && ! (inst instanceof IndexingCPInstruction)) {
+			UnaryCPInstruction instruction = (UnaryCPInstruction) inst;
+			if(inst instanceof ReorgCPInstruction && inst.getOpcode().equals("r'")) {
+				ReorgCPInstruction rinst = (ReorgCPInstruction) inst;
+				CacheableData<?> mo = ec.getCacheableData(rinst.input1);
+
+				if((mo instanceof MatrixObject || mo instanceof FrameObject) && mo.isFederated() )
+					fedinst = ReorgFEDInstruction.parseInstruction(rinst.getInstructionString());
+			}
+			else if(instruction.input1 != null && instruction.input1.isMatrix() 
+				&& ec.getMatrixObject(instruction.input1).isFederated()
+				&& ec.containsVariable(instruction.input1)) {
+				
 				MatrixObject mo1 = ec.getMatrixObject(instruction.input1);
-				if (mo1.isFederated() && instruction.getAUType() == AggregateUnaryCPInstruction.AUType.DEFAULT){
-					LOG.debug("Federated UnaryAggregate");
+				
+				if(instruction.getOpcode().equalsIgnoreCase("cm")) {
+					fedinst = CentralMomentFEDInstruction.parseInstruction(inst.getInstructionString());
+				}
+				else if(inst instanceof AggregateUnaryCPInstruction  &&
+					((AggregateUnaryCPInstruction) instruction).getAUType() == AggregateUnaryCPInstruction.AUType.DEFAULT) {
 					fedinst = AggregateUnaryFEDInstruction.parseInstruction(inst.getInstructionString());
 				}
+				else if(inst.getOpcode().equalsIgnoreCase("qsort") &&
+					mo1.getFedMapping().getFederatedRanges().length == 1) {
+					fedinst = QuantileSortFEDInstruction.parseInstruction(inst.getInstructionString());
+				}
 			}
 		}
 		else if (inst instanceof BinaryCPInstruction) {
@@ -98,6 +124,8 @@ public class FEDInstructionUtils {
 				|| (instruction.input2.isMatrix() && ec.getMatrixObject(instruction.input2).isFederated()) ) {
 				if(instruction.getOpcode().equals("append"))
 					fedinst = AppendFEDInstruction.parseInstruction(inst.getInstructionString());
+				else if(instruction.getOpcode().equals("qpick"))
+					fedinst = QuantilePickFEDInstruction.parseInstruction(inst.getInstructionString());
 				else
 					fedinst = BinaryFEDInstruction.parseInstruction(inst.getInstructionString());
 			}
@@ -122,19 +150,14 @@ public class FEDInstructionUtils {
 				}
 			}
 		}
-		else if(inst instanceof ReorgCPInstruction && inst.getOpcode().equals("r'")) {
-			ReorgCPInstruction rinst = (ReorgCPInstruction) inst;
-			CacheableData<?> mo = ec.getCacheableData(rinst.input1);
-			if( mo.isFederated() )
-				fedinst = ReorgFEDInstruction.parseInstruction(rinst.getInstructionString());
-		}
-		else if(inst instanceof MatrixIndexingCPInstruction && inst.getOpcode().equalsIgnoreCase("rightIndex")) {
+		else if(inst instanceof MatrixIndexingCPInstruction) {
 			// matrix indexing
+			LOG.info("Federated Indexing");
 			MatrixIndexingCPInstruction minst = (MatrixIndexingCPInstruction) inst;
-			if(minst.input1.isMatrix()) {
-				CacheableData<?> fo = ec.getCacheableData(minst.input1);
-				if(fo.isFederated())
-					fedinst = MatrixIndexingFEDInstruction.parseInstruction(minst.getInstructionString());
+			if(inst.getOpcode().equalsIgnoreCase("rightIndex") 
+				&& minst.input1.isMatrix() && ec.getCacheableData(minst.input1).isFederated()) {
+				LOG.info("Federated Right Indexing");
+				fedinst = MatrixIndexingFEDInstruction.parseInstruction(minst.getInstructionString());
 			}
 		}
 		else if(inst instanceof VariableCPInstruction ){
@@ -178,11 +201,47 @@ public class FEDInstructionUtils {
 					instruction.input1, instruction.input2, instruction.output, "ba+*", "FED...");
 			}
 		}
-		else if (inst instanceof AggregateUnarySPInstruction) {
-			AggregateUnarySPInstruction instruction = (AggregateUnarySPInstruction) inst;
-			Data data = ec.getVariable(instruction.input1);
-			if (data instanceof MatrixObject && ((MatrixObject) data).isFederated())
-				fedinst = AggregateUnaryFEDInstruction.parseInstruction(inst.getInstructionString());
+		else if (inst instanceof UnarySPInstruction) {
+			if (inst instanceof CentralMomentSPInstruction) {
+				CentralMomentSPInstruction instruction = (CentralMomentSPInstruction) inst;
+				Data data = ec.getVariable(instruction.input1);
+				if (data instanceof MatrixObject && ((MatrixObject) data).isFederated())
+					fedinst = CentralMomentFEDInstruction.parseInstruction(inst.getInstructionString());
+			} else if (inst instanceof QuantileSortSPInstruction) {
+				QuantileSortSPInstruction instruction = (QuantileSortSPInstruction) inst;
+				Data data = ec.getVariable(instruction.input1);
+				if (data instanceof MatrixObject && ((MatrixObject) data).isFederated())
+					fedinst = QuantileSortFEDInstruction.parseInstruction(inst.getInstructionString());
+			}
+			else if (inst instanceof AggregateUnarySPInstruction) {
+				AggregateUnarySPInstruction instruction = (AggregateUnarySPInstruction) inst;
+				Data data = ec.getVariable(instruction.input1);
+				if(data instanceof MatrixObject && ((MatrixObject) data).isFederated())
+					fedinst = AggregateUnaryFEDInstruction.parseInstruction(inst.getInstructionString());
+			}
+		}
+		else if(inst instanceof BinarySPInstruction) {
+			if(inst instanceof QuantilePickSPInstruction) {
+				QuantilePickSPInstruction instruction = (QuantilePickSPInstruction) inst;
+				Data data = ec.getVariable(instruction.input1);
+				if(data instanceof MatrixObject && ((MatrixObject) data).isFederated())
+					fedinst = QuantilePickFEDInstruction.parseInstruction(inst.getInstructionString());
+			}
+			else if (inst instanceof AppendGAlignedSPInstruction) {
+				// TODO other Append Spark instructions
+				AppendGAlignedSPInstruction instruction = (AppendGAlignedSPInstruction) inst;
+				Data data = ec.getVariable(instruction.input1);
+				if (data instanceof MatrixObject && ((MatrixObject) data).isFederated()) {
+					fedinst = AppendFEDInstruction.parseInstruction(instruction.getInstructionString());
+				}
+			}
+			else if (inst instanceof AppendGSPInstruction) {
+				AppendGSPInstruction instruction = (AppendGSPInstruction) inst;
+				Data data = ec.getVariable(instruction.input1);
+				if(data instanceof MatrixObject && ((MatrixObject) data).isFederated()) {
+					fedinst = AppendFEDInstruction.parseInstruction(instruction.getInstructionString());
+				}
+			}
 		}
 		else if (inst instanceof WriteSPInstruction) {
 			WriteSPInstruction instruction = (WriteSPInstruction) inst;
@@ -193,21 +252,6 @@ public class FEDInstructionUtils {
 				return VariableCPInstruction.parseInstruction(instruction.getInstructionString());
 			}
 		}
-		else if (inst instanceof AppendGAlignedSPInstruction) {
-			// TODO other Append Spark instructions
-			AppendGAlignedSPInstruction instruction = (AppendGAlignedSPInstruction) inst;
-			Data data = ec.getVariable(instruction.input1);
-			if (data instanceof MatrixObject && ((MatrixObject) data).isFederated()) {
-				fedinst = AppendFEDInstruction.parseInstruction(instruction.getInstructionString());
-			}
-		}
-		else if (inst instanceof AppendGSPInstruction) {
-			AppendGSPInstruction instruction = (AppendGSPInstruction) inst;
-			Data data = ec.getVariable(instruction.input1);
-			if(data instanceof MatrixObject && ((MatrixObject) data).isFederated()) {
-				fedinst = AppendFEDInstruction.parseInstruction(instruction.getInstructionString());
-			}
-		}
 		//set thread id for federated context management
 		if( fedinst != null ) {
 			fedinst.setTID(ec.getTID());
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/QuantilePickFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/QuantilePickFEDInstruction.java
new file mode 100644
index 0000000..f2052b5
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/QuantilePickFEDInstruction.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.fed;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.sysds.lops.PickByCount.OperationTypes;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedUDF;
+import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
+import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.instructions.cp.Data;
+import org.apache.sysds.runtime.instructions.cp.DoubleObject;
+import org.apache.sysds.runtime.instructions.cp.ScalarObject;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+
+public class QuantilePickFEDInstruction extends BinaryFEDInstruction {
+
+	private final OperationTypes _type;
+
+	private QuantilePickFEDInstruction(Operator op, CPOperand in, CPOperand out, OperationTypes type, boolean inmem,
+			String opcode, String istr) {
+		this(op, in, null, out, type, inmem, opcode, istr);
+	}
+
+	private QuantilePickFEDInstruction(Operator op, CPOperand in, CPOperand in2, CPOperand out, OperationTypes type,
+			boolean inmem, String opcode, String istr) {
+		super(FEDType.QPick, op, in, in2, out, opcode, istr);
+		_type = type;
+	}
+
+	public static QuantilePickFEDInstruction parseInstruction ( String str ) {
+		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
+		String opcode = parts[0];
+		if ( !opcode.equalsIgnoreCase("qpick") )
+			throw new DMLRuntimeException("Unknown opcode while parsing a QuantilePickCPInstruction: " + str);
+		//instruction parsing
+		if( parts.length == 4 ) {
+			//instructions of length 4 originate from unary - mr-iqm
+			//TODO this should be refactored to use pickvaluecount lops
+			CPOperand in1 = new CPOperand(parts[1]);
+			CPOperand in2 = new CPOperand(parts[2]);
+			CPOperand out = new CPOperand(parts[3]);
+			OperationTypes ptype = OperationTypes.IQM;
+			boolean inmem = false;
+			return new QuantilePickFEDInstruction(null, in1, in2, out, ptype, inmem, opcode, str);
+		}
+		else if( parts.length == 5 ) {
+			CPOperand in1 = new CPOperand(parts[1]);
+			CPOperand out = new CPOperand(parts[2]);
+			OperationTypes ptype = OperationTypes.valueOf(parts[3]);
+			boolean inmem = Boolean.parseBoolean(parts[4]);
+			return new QuantilePickFEDInstruction(null, in1, out, ptype, inmem, opcode, str);
+		}
+		else if( parts.length == 6 ) {
+			CPOperand in1 = new CPOperand(parts[1]);
+			CPOperand in2 = new CPOperand(parts[2]);
+			CPOperand out = new CPOperand(parts[3]);
+			OperationTypes ptype = OperationTypes.valueOf(parts[4]);
+			boolean inmem = Boolean.parseBoolean(parts[5]);
+			return new QuantilePickFEDInstruction(null, in1, in2, out, ptype, inmem, opcode, str);
+		}
+		return null;
+	}
+
+	@Override
+	public void processInstruction(ExecutionContext ec) {
+		MatrixObject in = ec.getMatrixObject(input1);
+		FederationMap fedMapping = in.getFedMapping();
+
+		List <Object> res = new ArrayList<>();
+		long varID = FederationUtils.getNextFedDataID();
+		fedMapping.mapParallel(varID, (range, data) -> {
+			FederatedResponse response;
+			try {
+				switch( _type )
+				{
+					case VALUEPICK:
+						if(input2.isScalar()) {
+							ScalarObject quantile = ec.getScalarInput(input2);
+							response = data.executeFederatedOperation(
+								new FederatedRequest(FederatedRequest.RequestType.EXEC_UDF,-1,
+								new QuantilePickFEDInstruction.ValuePick(data.getVarID(), quantile))).get();
+						}
+						else {
+							MatrixBlock quantiles = ec.getMatrixInput(input2.getName());
+							response = data.executeFederatedOperation(
+								new FederatedRequest(FederatedRequest.RequestType.EXEC_UDF,-1,
+								new QuantilePickFEDInstruction.ValuePick(data.getVarID(), quantiles))).get();
+						}
+						break;
+					case IQM:
+						response = data
+							.executeFederatedOperation(
+								new FederatedRequest(FederatedRequest.RequestType.EXEC_UDF, -1,
+								new QuantilePickFEDInstruction.IQM(data.getVarID()))).get();
+						break;
+					case MEDIAN:
+						response = data
+							.executeFederatedOperation(new FederatedRequest(FederatedRequest.RequestType.EXEC_UDF, -1,
+								new QuantilePickFEDInstruction.Median(data.getVarID()))).get();
+						break;
+					default:
+						throw new DMLRuntimeException("Unsupported qpick operation type: "+_type);
+				}
+
+				if(!response.isSuccessful())
+					response.throwExceptionFromResponse();
+
+				res.add(response.getData()[0]);
+			}
+			catch(Exception e) {
+				throw new DMLRuntimeException(e);
+			}
+			return null;
+		});
+
+		assert res.size() == 1;
+
+		if(output.isScalar())
+			ec.setScalarOutput(output.getName(), new DoubleObject((double) res.get(0)));
+		else
+			ec.setMatrixOutput(output.getName(), (MatrixBlock) res.get(0));
+	}
+
+	private static class ValuePick extends FederatedUDF {
+
+		private static final long serialVersionUID = -2594912886841345102L;
+		private final MatrixBlock _quantiles;
+
+		protected ValuePick(long input, ScalarObject quantile) {
+			super(new long[] {input});
+			_quantiles = new MatrixBlock(quantile.getDoubleValue());
+		}
+
+		protected ValuePick(long input, MatrixBlock quantiles) {
+			super(new long[] {input});
+			_quantiles = quantiles;
+		}
+		@Override
+		public FederatedResponse execute(ExecutionContext ec, Data... data) {
+			MatrixBlock mb = ((MatrixObject)data[0]).acquireReadAndRelease();
+			MatrixBlock picked;
+			if (_quantiles.getLength() == 1) {
+				return new FederatedResponse(FederatedResponse.ResponseType.SUCCESS,
+					new Object[] {mb.pickValue(_quantiles.getValue(0, 0))});
+			}
+			else {
+				picked = mb.pickValues(_quantiles, new MatrixBlock());
+				return new FederatedResponse(FederatedResponse.ResponseType.SUCCESS,
+					new Object[] {picked});
+			}
+		}
+	}
+
+	private static class IQM extends FederatedUDF {
+
+		private static final long serialVersionUID = 2223186699111957677L;
+
+		protected IQM(long input) {
+			super(new long[] {input});
+		}
+		@Override
+		public FederatedResponse execute(ExecutionContext ec, Data... data) {
+			MatrixBlock mb = ((MatrixObject)data[0]).acquireReadAndRelease();
+			return new FederatedResponse(FederatedResponse.ResponseType.SUCCESS,
+				new Object[] {mb.interQuartileMean()});
+		}
+	}
+
+	private static class Median extends FederatedUDF {
+
+		private static final long serialVersionUID = -2808597461054603816L;
+
+		protected Median(long input) {
+			super(new long[] {input});
+		}
+		@Override
+		public FederatedResponse execute(ExecutionContext ec, Data... data) {
+			MatrixBlock mb = ((MatrixObject)data[0]).acquireReadAndRelease();
+			return new FederatedResponse(FederatedResponse.ResponseType.SUCCESS,
+				new Object[] {mb.median()});
+		}
+	}
+}
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/QuantileSortFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/QuantileSortFEDInstruction.java
new file mode 100644
index 0000000..0e994d9
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/QuantileSortFEDInstruction.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.fed;
+
+import org.apache.sysds.common.Types;
+import org.apache.sysds.lops.SortKeys;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedUDF;
+import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
+import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.instructions.cp.Data;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+
+public class QuantileSortFEDInstruction extends UnaryFEDInstruction{
+
+	private QuantileSortFEDInstruction(CPOperand in, CPOperand out, String opcode, String istr) {
+		this(in, null, out, opcode, istr);
+	}
+
+	private QuantileSortFEDInstruction(CPOperand in1, CPOperand in2, CPOperand out, String opcode,
+		String istr) {
+		super(FEDInstruction.FEDType.QSort, null, in1, in2, out, opcode, istr);
+	}
+
+	public static QuantileSortFEDInstruction parseInstruction ( String str ) {
+		CPOperand in1 = new CPOperand("", Types.ValueType.UNKNOWN, Types.DataType.UNKNOWN);
+		CPOperand in2 = null;
+		CPOperand out = new CPOperand("", Types.ValueType.UNKNOWN, Types.DataType.UNKNOWN);
+
+		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
+		String opcode = parts[0];
+
+		if ( opcode.equalsIgnoreCase(SortKeys.OPCODE) ) {
+			if ( parts.length == 3 ) {
+				// Example: sort:mVar1:mVar2 (input=mVar1, output=mVar2)
+				parseUnaryInstruction(str, in1, out);
+				return new QuantileSortFEDInstruction(in1, out, opcode, str);
+			}
+			else if ( parts.length == 4 ) {
+				// Example: sort:mVar1:mVar2:mVar3 (input=mVar1, weights=mVar2, output=mVar3)
+				in2 = new CPOperand("", Types.ValueType.UNKNOWN, Types.DataType.UNKNOWN);
+				parseUnaryInstruction(str, in1, in2, out);
+				return new QuantileSortFEDInstruction(in1, in2, out, opcode, str);
+			}
+			else {
+				throw new DMLRuntimeException("Invalid number of operands in instruction: " + str);
+			}
+		}
+		else {
+			throw new DMLRuntimeException("Unknown opcode while parsing a QuantileSortFEDInstruction: " + str);
+		}
+	}
+
+
+//	@Override
+//	public void processInstruction(ExecutionContext ec) {
+//		MatrixObject in = ec.getMatrixObject(input1.getName());
+//		FederationMap map = in.getFedMapping();
+//
+//		//create federated commands for aggregation
+//		FederatedRequest fr1 = FederationUtils
+//			.callInstruction(instString, output, new CPOperand[] {input1}, new long[] {in.getFedMapping().getID()});
+//		FederatedRequest fr2 = new FederatedRequest(FederatedRequest.RequestType.GET_VAR, fr1.getID());
+//		FederatedRequest fr3 = map.cleanup(getTID(), fr1.getID());
+//
+//		Future<FederatedResponse>[] tmp = map.execute(getTID(), fr1, fr2, fr3);
+//
+//		try {
+//			Object d = tmp[0].get().getData()[0];
+//			System.out.println(1);
+//		}
+//		catch(Exception e) {
+//			e.printStackTrace();
+//		}
+//
+//		MatrixObject out = ec.getMatrixObject(output);
+//		out.getDataCharacteristics().set(in.getDataCharacteristics());
+//		out.setFedMapping(in.getFedMapping().copyWithNewID(fr2.getID()));
+//	}
+
+
+	@Override
+	public void processInstruction(ExecutionContext ec) {
+		MatrixObject in = ec.getMatrixObject(input1);
+		FederationMap fedMapping = in.getFedMapping();
+
+		long varID = FederationUtils.getNextFedDataID();
+		FederationMap sortedMapping = fedMapping.mapParallel(varID, (range, data) -> {
+			try {
+				MatrixBlock wtBlock = null;
+				if (input2 != null) {
+					wtBlock = ec.getMatrixInput(input2.getName());
+				}
+
+				FederatedResponse response = data
+					.executeFederatedOperation(new FederatedRequest(FederatedRequest.RequestType.EXEC_UDF, -1,
+						new GetSorted(data.getVarID(), varID, wtBlock))).get();
+				if(!response.isSuccessful())
+					response.throwExceptionFromResponse();
+			}
+			catch(Exception e) {
+				throw new DMLRuntimeException(e);
+			}
+			return null;
+		});
+
+
+		MatrixObject sorted = ec.getMatrixObject(output);
+		sorted.getDataCharacteristics().set(in.getDataCharacteristics());
+
+		// set the federated mapping for the matrix
+		sorted.setFedMapping(sortedMapping);
+	}
+
+	private static class GetSorted extends FederatedUDF {
+
+		private static final long serialVersionUID = -1969015577260167645L;
+		private final long _outputID;
+		private final MatrixBlock _weights;
+
+		protected GetSorted(long input, long outputID, MatrixBlock weights) {
+			super(new long[] {input});
+			_outputID = outputID;
+			_weights = weights;
+		}
+		@Override
+		public FederatedResponse execute(ExecutionContext ec, Data... data) {
+			MatrixBlock mb = ((MatrixObject) data[0]).acquireReadAndRelease();
+
+			MatrixBlock res = mb.sortOperations(_weights, new MatrixBlock());
+
+			MatrixObject mout = ExecutionContext.createMatrixObject(res);
+
+			// add it to the list of variables
+			ec.setVariable(String.valueOf(_outputID), mout);
+			// return schema
+			return new FederatedResponse(FederatedResponse.ResponseType.SUCCESS_EMPTY);
+		}
+	}
+}
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/ReorgFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/ReorgFEDInstruction.java
index a4b604b..d9e9d97 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/ReorgFEDInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/ReorgFEDInstruction.java
@@ -54,12 +54,12 @@ public class ReorgFEDInstruction extends UnaryFEDInstruction {
 		if( !mo1.isFederated() )
 			throw new DMLRuntimeException("Federated Reorg: "
 				+ "Federated input expected, but invoked w/ "+mo1.isFederated());
-	
+
 		//execute transpose at federated site
 		FederatedRequest fr1 = FederationUtils.callInstruction(instString, output,
 			new CPOperand[]{input1}, new long[]{mo1.getFedMapping().getID()});
 		mo1.getFedMapping().execute(getTID(), true, fr1);
-		
+
 		//drive output federated mapping
 		MatrixObject out = ec.getMatrixObject(output);
 		out.getDataCharacteristics().set(mo1.getNumColumns(),
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/TsmmFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/TsmmFEDInstruction.java
index fbe88d6..ed9615f 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/TsmmFEDInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/TsmmFEDInstruction.java
@@ -25,12 +25,14 @@ import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
 import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.RequestType;
 import org.apache.sysds.runtime.instructions.InstructionUtils;
 import org.apache.sysds.runtime.instructions.cp.CPOperand;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 
+import java.util.Arrays;
 import java.util.concurrent.Future;
 
 public class TsmmFEDInstruction extends BinaryFEDInstruction {
@@ -62,7 +64,7 @@ public class TsmmFEDInstruction extends BinaryFEDInstruction {
 	public void processInstruction(ExecutionContext ec) {
 		MatrixObject mo1 = ec.getMatrixObject(input1);
 		
-		if(mo1.isFederated() && _type.isLeft()) { // left tsmm
+		if((_type.isLeft() && mo1.isFederated(FederationMap.FType.ROW)) || (mo1.isFederated(FederationMap.FType.COL) && _type.isRight())) {
 			//construct commands: fed tsmm, retrieve results
 			FederatedRequest fr1 = FederationUtils.callInstruction(instString, output,
 				new CPOperand[]{input1}, new long[]{mo1.getFedMapping().getID()});
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedBivarTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedBivarTest.java
index 3a391d9..ced8bca 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedBivarTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedBivarTest.java
@@ -55,7 +55,8 @@ public class FederatedBivarTest extends AutomatedTestBase {
 	@Parameterized.Parameters
 	public static Collection<Object[]> data() {
 		return Arrays.asList(new Object[][] {{10000, 16},
-			// {2000, 32}, {1000, 64},
+			// {2000, 32},
+			// {1000, 64},
 			{10000, 128}});
 	}
 
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedUnivarTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedCorTest.java
similarity index 65%
copy from src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedUnivarTest.java
copy to src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedCorTest.java
index f24aa6c..1b06279 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedUnivarTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedCorTest.java
@@ -19,7 +19,11 @@
 
 package org.apache.sysds.test.functions.federated.algorithms;
 
-import org.apache.sysds.common.Types;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types.ExecMode;
 import org.apache.sysds.runtime.meta.MatrixCharacteristics;
 import org.apache.sysds.runtime.util.HDFSTool;
 import org.apache.sysds.test.AutomatedTestBase;
@@ -30,70 +34,67 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.util.Arrays;
-import java.util.Collection;
-
 @RunWith(value = Parameterized.class)
 @net.jcip.annotations.NotThreadSafe
-public class FederatedUnivarTest extends AutomatedTestBase {
+public class FederatedCorTest extends AutomatedTestBase {
+
+	private final static String TEST_NAME = "FederatedCorTest";
 	private final static String TEST_DIR = "functions/federated/";
-	private final static String TEST_NAME = "FederatedUnivarTest";
-	private final static String TEST_CLASS_DIR = TEST_DIR + FederatedUnivarTest.class.getSimpleName() + "/";
+	private static final String TEST_CLASS_DIR = TEST_DIR + FederatedCorTest.class.getSimpleName() + "/";
 
 	private final static int blocksize = 1024;
+
 	@Parameterized.Parameter()
 	public int rows;
 	@Parameterized.Parameter(1)
 	public int cols;
-
-	@Override
-	public void setUp() {
-		TestUtils.clearAssertionInformation();
-		addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"B"}));
-	}
+	@Parameterized.Parameter(2)
+	public boolean rowPartitioned;
 
 	@Parameterized.Parameters
 	public static Collection<Object[]> data() {
-		return Arrays.asList(new Object[][] {
-				{10000, 16},
-				{2000, 32}, {1000, 64}, {10000, 128}
-		});
+		return Arrays.asList(new Object[][] {{1600, 8, true}});
 	}
 
-	@Test
-	public void federatedUnivarSinglenode() {
-		federatedL2SVM(Types.ExecMode.SINGLE_NODE);
+	@Override
+	public void setUp() {
+		TestUtils.clearAssertionInformation();
+		addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"S"}));
 	}
 
 	@Test
-	public void federatedUnivarHybrid() {
-		federatedL2SVM(Types.ExecMode.HYBRID);
+	public void testCorCP() {
+		runAggregateOperationTest(ExecMode.SINGLE_NODE);
 	}
 
-	public void federatedL2SVM(Types.ExecMode execMode) {
-		Types.ExecMode platformOld = setExecMode(execMode);
+	private void runAggregateOperationTest(ExecMode execMode) {
+		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+		ExecMode platformOld = rtplatform;
+
+		if(rtplatform == ExecMode.SPARK)
+			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
 
 		getAndLoadTestConfiguration(TEST_NAME);
 		String HOME = SCRIPT_DIR + TEST_DIR;
 
 		// write input matrices
-		int quarterCols = cols / 4;
-		// We have two matrices handled by a single federated worker
-		double[][] X1 = getRandomMatrix(rows, quarterCols, 1, 5, 1, 3);
-		double[][] X2 = getRandomMatrix(rows, quarterCols, 1, 5, 1, 7);
-		double[][] X3 = getRandomMatrix(rows, quarterCols, 1, 5, 1, 8);
-		double[][] X4 = getRandomMatrix(rows, quarterCols, 1, 5, 1, 9);
-
-		// write types matrix shape of (1, D)
-		double [][] Y = getRandomMatrix(1, cols, 0, 3, 1, 9);
-		Arrays.stream(Y[0]).forEach(Math::ceil);
-
-		MatrixCharacteristics mc= new MatrixCharacteristics(rows, quarterCols, blocksize, rows * quarterCols);
+		int r = rows;
+		int c = cols / 4;
+		if(rowPartitioned) {
+			r = rows / 4;
+			c = cols;
+		}
+
+		double[][] X1 = getRandomMatrix(r, c, 1, 5, 1, 3);
+		double[][] X2 = getRandomMatrix(r, c, 1, 5, 1, 7);
+		double[][] X3 = getRandomMatrix(r, c, 1, 5, 1, 8);
+		double[][] X4 = getRandomMatrix(r, c, 1, 5, 1, 9);
+
+		MatrixCharacteristics mc = new MatrixCharacteristics(r, c, blocksize, r * c);
 		writeInputMatrixWithMTD("X1", X1, false, mc);
 		writeInputMatrixWithMTD("X2", X2, false, mc);
 		writeInputMatrixWithMTD("X3", X3, false, mc);
 		writeInputMatrixWithMTD("X4", X4, false, mc);
-		writeInputMatrixWithMTD("Y", Y, false);
 
 		// empty script name because we don't execute any script, just start the worker
 		fullDMLScriptName = "";
@@ -106,41 +107,46 @@ public class FederatedUnivarTest extends AutomatedTestBase {
 		Thread t3 = startLocalFedWorkerThread(port3);
 		Thread t4 = startLocalFedWorkerThread(port4);
 
+		rtplatform = execMode;
+		if(rtplatform == ExecMode.SPARK)
+			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
 		loadTestConfiguration(config);
-		
 
 		// Run reference dml script with normal matrix
 		fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
-		programArgs = new String[] {"-stats", "100", "-args", input("X1"), input("X2"), input("X3"), input("X4"), input("Y"), expected("B")};
+		programArgs = new String[] {"-stats", "100", "-args", input("X1"), input("X2"), input("X3"), input("X4"),
+			expected("S"), Boolean.toString(rowPartitioned).toUpperCase()};
 		runTest(true, false, null, -1);
 
 		// Run actual dml script with federated matrix
+
 		fullDMLScriptName = HOME + TEST_NAME + ".dml";
-		programArgs = new String[] {"-stats",  "100", "-nvargs",
+		programArgs = new String[] {"-explain", "-stats", "100", "-nvargs",
 			"in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
 			"in_X2=" + TestUtils.federatedAddress(port2, input("X2")),
 			"in_X3=" + TestUtils.federatedAddress(port3, input("X3")),
-			"in_X4=" + TestUtils.federatedAddress(port4, input("X4")),
-			"in_Y=" + input("Y"), // types
-			"rows=" + rows, "cols=" + cols,
-			"out=" + output("B")};
+			"in_X4=" + TestUtils.federatedAddress(port4, input("X4")), "rows=" + rows, "cols=" + cols,
+			"rP=" + Boolean.toString(rowPartitioned).toUpperCase(), "out_S=" + output("S")};
+
 		runTest(true, false, null, -1);
 
 		// compare via files
 		compareResults(1e-9);
-		TestUtils.shutdownThreads(t1, t2, t3, t4);
 
-		// check for federated operations
-		Assert.assertTrue(heavyHittersContainsString("fed_uacmax"));
+		// Assert.assertTrue(heavyHittersContainsString("k+"));
 
-		//check that federated input files are still existing
+		// check that federated input files are still existing
 		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
 		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X2")));
 		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X3")));
 		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X4")));
-		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("Y")));
 
-		resetExecMode(platformOld);
+		TestUtils.shutdownThreads(t1, t2, t3, t4);
+
+		rtplatform = platformOld;
+		DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+
 	}
 }
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedGLMTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedGLMTest.java
index 44de28f..1e608ce 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedGLMTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedGLMTest.java
@@ -57,7 +57,9 @@ public class FederatedGLMTest extends AutomatedTestBase {
 	@Parameterized.Parameters
 	public static Collection<Object[]> data() {
 		// rows have to be even and > 1
-		return Arrays.asList(new Object[][] {{10000, 10}, {1000, 100}, {2000, 43}});
+		return Arrays.asList(new Object[][] {
+			// {10000, 10}, {1000, 100},
+			{2000, 43}});
 	}
 
 	@Test
@@ -70,7 +72,6 @@ public class FederatedGLMTest extends AutomatedTestBase {
 		federatedGLM(Types.ExecMode.HYBRID);
 	}
 
-	
 	public void federatedGLM(Types.ExecMode execMode) {
 		ExecMode platformOld = setExecMode(execMode);
 
@@ -99,8 +100,8 @@ public class FederatedGLMTest extends AutomatedTestBase {
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
 		loadTestConfiguration(config);
-		// 
-		
+		//
+
 		// Run reference dml script with normal matrix
 		fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
 		programArgs = new String[] {"-args", input("X1"), input("X2"), input("Y"), expected("Z")};
@@ -108,8 +109,7 @@ public class FederatedGLMTest extends AutomatedTestBase {
 
 		// Run actual dml script with federated matrix
 		fullDMLScriptName = HOME + TEST_NAME + ".dml";
-		programArgs = new String[] {"-stats",
-			"-nvargs", "in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
+		programArgs = new String[] {"-stats", "-nvargs", "in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
 			"in_X2=" + TestUtils.federatedAddress(port2, input("X2")), "rows=" + rows, "cols=" + cols,
 			"in_Y=" + input("Y"), "out=" + output("Z")};
 		runTest(true, false, null, -1);
@@ -121,15 +121,15 @@ public class FederatedGLMTest extends AutomatedTestBase {
 
 		// check for federated operations
 		Assert.assertTrue(heavyHittersContainsString("fed_ba+*"));
-		Assert.assertTrue(heavyHittersContainsString("fed_uark+","fed_uarsqk+"));
+		Assert.assertTrue(heavyHittersContainsString("fed_uark+", "fed_uarsqk+"));
 		Assert.assertTrue(heavyHittersContainsString("fed_uack+"));
-		//Assert.assertTrue(heavyHittersContainsString("fed_uak+"));
+		// Assert.assertTrue(heavyHittersContainsString("fed_uak+"));
 		Assert.assertTrue(heavyHittersContainsString("fed_mmchain"));
-		
-		//check that federated input files are still existing
+
+		// check that federated input files are still existing
 		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
 		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X2")));
-		
+
 		resetExecMode(platformOld);
 	}
 }
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedKmeansTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedKmeansTest.java
index 0dd339f..8a33d20 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedKmeansTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedKmeansTest.java
@@ -64,12 +64,13 @@ public class FederatedKmeansTest extends AutomatedTestBase {
 	@Parameterized.Parameters
 	public static Collection<Object[]> data() {
 		// rows have to be even and > 1
-		return Arrays.asList(new Object[][] {
-			{10000, 10, 1, 1}, {2000, 50, 1, 1}, {1000, 100, 1, 1},
-			{10000, 10, 2, 1}, {2000, 50, 2, 1}, {1000, 100, 2, 1}, //concurrent requests
-			{10000, 10, 2, 2}, //repeated exec
-			//TODO more runs e.g., 16 -> but requires rework RPC framework first
-			//(e.g., see paramserv?)
+		return Arrays.asList(new Object[][] {{10000, 10, 1, 1},
+			// {2000, 50, 1, 1}, {1000, 100, 1, 1},
+			{10000, 10, 2, 1},
+			// {2000, 50, 2, 1}, {1000, 100, 2, 1}, //concurrent requests
+			{10000, 10, 2, 2}, // repeated exec
+			// TODO more runs e.g., 16 -> but requires rework RPC framework first
+			// (e.g., see paramserv?)
 		});
 	}
 
@@ -77,7 +78,7 @@ public class FederatedKmeansTest extends AutomatedTestBase {
 	public void federatedKmeansSinglenode() {
 		federatedKmeans(Types.ExecMode.SINGLE_NODE);
 	}
-	
+
 	@Test
 	public void federatedKmeansHybrid() {
 		federatedKmeans(Types.ExecMode.HYBRID);
@@ -106,29 +107,26 @@ public class FederatedKmeansTest extends AutomatedTestBase {
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
 		loadTestConfiguration(config);
-		
-		
+
 		// Run reference dml script with normal matrix
 		fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
-		programArgs = new String[] {"-args", input("X1"), input("X2"),
-			String.valueOf(runs), expected("Z")};
+		programArgs = new String[] {"-args", input("X1"), input("X2"), String.valueOf(runs), expected("Z")};
 		runTest(true, false, null, -1);
 
 		// Run actual dml script with federated matrix
 		fullDMLScriptName = HOME + TEST_NAME + ".dml";
-		programArgs = new String[] {"-stats",
-			"-nvargs", "in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
+		programArgs = new String[] {"-stats", "-nvargs", "in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
 			"in_X2=" + TestUtils.federatedAddress(port2, input("X2")), "rows=" + rows, "cols=" + cols,
 			"runs=" + String.valueOf(runs), "out=" + output("Z")};
-		
-		for( int i=0; i<rep; i++ ) {
+
+		for(int i = 0; i < rep; i++) {
 			ParForProgramBlock.resetWorkerIDs();
 			FederationUtils.resetFedDataID();
 			runTest(true, false, null, -1);
-		
+
 			// check for federated operations
 			Assert.assertTrue(heavyHittersContainsString("fed_ba+*"));
-			//Assert.assertTrue(heavyHittersContainsString("fed_uasqk+"));
+			// Assert.assertTrue(heavyHittersContainsString("fed_uasqk+"));
 			Assert.assertTrue(heavyHittersContainsString("fed_uarmin"));
 			Assert.assertTrue(heavyHittersContainsString("fed_uark+"));
 			Assert.assertTrue(heavyHittersContainsString("fed_uack+"));
@@ -137,16 +135,16 @@ public class FederatedKmeansTest extends AutomatedTestBase {
 			Assert.assertTrue(heavyHittersContainsString("fed_<="));
 			Assert.assertTrue(heavyHittersContainsString("fed_/"));
 			Assert.assertTrue(heavyHittersContainsString("fed_r'"));
-			
-			//check that federated input files are still existing
+
+			// check that federated input files are still existing
 			Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
 			Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X2")));
 		}
-		
+
 		// compare via files
-		//compareResults(1e-9); --> randomized
+		// compareResults(1e-9); --> randomized
 		TestUtils.shutdownThreads(t1, t2);
-		
+
 		resetExecMode(platformOld);
 	}
 }
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedL2SVMTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedL2SVMTest.java
index e24935d..53bfc8d 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedL2SVMTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedL2SVMTest.java
@@ -55,7 +55,9 @@ public class FederatedL2SVMTest extends AutomatedTestBase {
 	@Parameterized.Parameters
 	public static Collection<Object[]> data() {
 		// rows have to be even and > 1
-		return Arrays.asList(new Object[][] {{2, 1000}, {10, 100}, {100, 10}, {1000, 1}, {10, 2000}, {2000, 10}});
+		return Arrays.asList(new Object[][] {
+			// {2, 1000}, {10, 100}, {100, 10}, {1000, 1}, {10, 2000},
+			{2000, 10}});
 	}
 
 	@Test
@@ -102,8 +104,7 @@ public class FederatedL2SVMTest extends AutomatedTestBase {
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
 		loadTestConfiguration(config);
-		
-		
+
 		// Run reference dml script with normal matrix
 		fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
 		programArgs = new String[] {"-args", input("X1"), input("X2"), input("Y"), expected("Z")};
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLogRegTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLogRegTest.java
index 0550ec9..42c614b 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLogRegTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLogRegTest.java
@@ -57,7 +57,9 @@ public class FederatedLogRegTest extends AutomatedTestBase {
 	@Parameterized.Parameters
 	public static Collection<Object[]> data() {
 		// rows have to be even and > 1
-		return Arrays.asList(new Object[][] {{10000, 10}, {1000, 100}, {2000, 43}});
+		return Arrays.asList(new Object[][] {
+			// {10000, 10}, {1000, 100},
+			{2000, 43}});
 	}
 
 	@Test
@@ -98,7 +100,6 @@ public class FederatedLogRegTest extends AutomatedTestBase {
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
 		loadTestConfiguration(config);
-		
 
 		// Run reference dml script with normal matrix
 		fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedPCATest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedPCATest.java
index 33179b7..99c90ee 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedPCATest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedPCATest.java
@@ -60,9 +60,10 @@ public class FederatedPCATest extends AutomatedTestBase {
 	@Parameterized.Parameters
 	public static Collection<Object[]> data() {
 		// rows have to be even and > 1
-		return Arrays.asList(new Object[][] {
-			{10000, 10, false}, {2000, 50, false}, {1000, 100, false},
-			{10000, 10, true}, {2000, 50, true}, {1000, 100, true}
+		return Arrays.asList(new Object[][] {{10000, 10, false},
+			// {2000, 50, false}, {1000, 100, false},
+			{10000, 10, true},
+			// {2000, 50, true}, {1000, 100, true}
 		});
 	}
 
@@ -70,7 +71,7 @@ public class FederatedPCATest extends AutomatedTestBase {
 	public void federatedPCASinglenode() {
 		federatedL2SVM(Types.ExecMode.SINGLE_NODE);
 	}
-	
+
 	@Test
 	public void federatedPCAHybrid() {
 		federatedL2SVM(Types.ExecMode.HYBRID);
@@ -89,7 +90,7 @@ public class FederatedPCATest extends AutomatedTestBase {
 		double[][] X2 = getRandomMatrix(quarterRows, cols, 0, 1, 1, 7);
 		double[][] X3 = getRandomMatrix(quarterRows, cols, 0, 1, 1, 8);
 		double[][] X4 = getRandomMatrix(quarterRows, cols, 0, 1, 1, 9);
-		MatrixCharacteristics mc= new MatrixCharacteristics(quarterRows, cols, blocksize, quarterRows * cols);
+		MatrixCharacteristics mc = new MatrixCharacteristics(quarterRows, cols, blocksize, quarterRows * cols);
 		writeInputMatrixWithMTD("X1", X1, false, mc);
 		writeInputMatrixWithMTD("X2", X2, false, mc);
 		writeInputMatrixWithMTD("X3", X3, false, mc);
@@ -108,8 +109,7 @@ public class FederatedPCATest extends AutomatedTestBase {
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
 		loadTestConfiguration(config);
-		
-		
+
 		// Run reference dml script with normal matrix
 		fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
 		programArgs = new String[] {"-stats", "-args", input("X1"), input("X2"), input("X3"), input("X4"),
@@ -118,37 +118,35 @@ public class FederatedPCATest extends AutomatedTestBase {
 
 		// Run actual dml script with federated matrix
 		fullDMLScriptName = HOME + TEST_NAME + ".dml";
-		programArgs = new String[] {"-stats", "-nvargs", 
-			"in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
+		programArgs = new String[] {"-stats", "-nvargs", "in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
 			"in_X2=" + TestUtils.federatedAddress(port2, input("X2")),
 			"in_X3=" + TestUtils.federatedAddress(port3, input("X3")),
-			"in_X4=" + TestUtils.federatedAddress(port4, input("X4")),
-			"rows=" + rows, "cols=" + cols,
+			"in_X4=" + TestUtils.federatedAddress(port4, input("X4")), "rows=" + rows, "cols=" + cols,
 			"scaleAndShift=" + String.valueOf(scaleAndShift).toUpperCase(), "out=" + output("Z")};
 		runTest(true, false, null, -1);
 
 		// compare via files
 		compareResults(1e-9);
 		TestUtils.shutdownThreads(t1, t2, t3, t4);
-		
+
 		// check for federated operations
 		Assert.assertTrue(heavyHittersContainsString("fed_ba+*"));
 		Assert.assertTrue(heavyHittersContainsString("fed_uack+"));
 		Assert.assertTrue(heavyHittersContainsString("fed_tsmm"));
-		if( scaleAndShift ) {
+		if(scaleAndShift) {
 			Assert.assertTrue(heavyHittersContainsString("fed_uacsqk+"));
 			Assert.assertTrue(heavyHittersContainsString("fed_uacmean"));
 			Assert.assertTrue(heavyHittersContainsString("fed_-"));
 			Assert.assertTrue(heavyHittersContainsString("fed_/"));
 			Assert.assertTrue(heavyHittersContainsString("fed_replace"));
 		}
-		
-		//check that federated input files are still existing
+
+		// check that federated input files are still existing
 		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
 		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X2")));
 		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X3")));
 		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X4")));
-		
+
 		resetExecMode(platformOld);
 	}
 }
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedUnivarTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedUnivarTest.java
index f24aa6c..588796a 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedUnivarTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedUnivarTest.java
@@ -55,9 +55,8 @@ public class FederatedUnivarTest extends AutomatedTestBase {
 	@Parameterized.Parameters
 	public static Collection<Object[]> data() {
 		return Arrays.asList(new Object[][] {
-				{10000, 16},
-				{2000, 32}, {1000, 64}, {10000, 128}
-		});
+			// {10000, 16},{2000, 32}, {1000, 64},
+			{10000, 128}});
 	}
 
 	@Test
@@ -85,10 +84,10 @@ public class FederatedUnivarTest extends AutomatedTestBase {
 		double[][] X4 = getRandomMatrix(rows, quarterCols, 1, 5, 1, 9);
 
 		// write types matrix shape of (1, D)
-		double [][] Y = getRandomMatrix(1, cols, 0, 3, 1, 9);
+		double[][] Y = getRandomMatrix(1, cols, 0, 3, 1, 9);
 		Arrays.stream(Y[0]).forEach(Math::ceil);
 
-		MatrixCharacteristics mc= new MatrixCharacteristics(rows, quarterCols, blocksize, rows * quarterCols);
+		MatrixCharacteristics mc = new MatrixCharacteristics(rows, quarterCols, blocksize, rows * quarterCols);
 		writeInputMatrixWithMTD("X1", X1, false, mc);
 		writeInputMatrixWithMTD("X2", X2, false, mc);
 		writeInputMatrixWithMTD("X3", X3, false, mc);
@@ -108,23 +107,21 @@ public class FederatedUnivarTest extends AutomatedTestBase {
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
 		loadTestConfiguration(config);
-		
 
 		// Run reference dml script with normal matrix
 		fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
-		programArgs = new String[] {"-stats", "100", "-args", input("X1"), input("X2"), input("X3"), input("X4"), input("Y"), expected("B")};
+		programArgs = new String[] {"-stats", "100", "-args", input("X1"), input("X2"), input("X3"), input("X4"),
+			input("Y"), expected("B")};
 		runTest(true, false, null, -1);
 
 		// Run actual dml script with federated matrix
 		fullDMLScriptName = HOME + TEST_NAME + ".dml";
-		programArgs = new String[] {"-stats",  "100", "-nvargs",
+		programArgs = new String[] {"-stats", "100", "-nvargs",
 			"in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
 			"in_X2=" + TestUtils.federatedAddress(port2, input("X2")),
 			"in_X3=" + TestUtils.federatedAddress(port3, input("X3")),
-			"in_X4=" + TestUtils.federatedAddress(port4, input("X4")),
-			"in_Y=" + input("Y"), // types
-			"rows=" + rows, "cols=" + cols,
-			"out=" + output("B")};
+			"in_X4=" + TestUtils.federatedAddress(port4, input("X4")), "in_Y=" + input("Y"), // types
+			"rows=" + rows, "cols=" + cols, "out=" + output("B")};
 		runTest(true, false, null, -1);
 
 		// compare via files
@@ -134,7 +131,7 @@ public class FederatedUnivarTest extends AutomatedTestBase {
 		// check for federated operations
 		Assert.assertTrue(heavyHittersContainsString("fed_uacmax"));
 
-		//check that federated input files are still existing
+		// check that federated input files are still existing
 		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
 		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X2")));
 		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X3")));
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRightIndexTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedVarTest.java
similarity index 64%
copy from src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRightIndexTest.java
copy to src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedVarTest.java
index 0adcb15..9579fef 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRightIndexTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedVarTest.java
@@ -17,11 +17,13 @@
  * under the License.
  */
 
-package org.apache.sysds.test.functions.federated.primitives;
+package org.apache.sysds.test.functions.federated.algorithms;
 
 import java.util.Arrays;
 import java.util.Collection;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types.ExecMode;
 import org.apache.sysds.runtime.meta.MatrixCharacteristics;
@@ -36,89 +38,53 @@ import org.junit.runners.Parameterized;
 
 @RunWith(value = Parameterized.class)
 @net.jcip.annotations.NotThreadSafe
-public class FederatedRightIndexTest extends AutomatedTestBase {
-	// private static final Log LOG = LogFactory.getLog(FederatedRightIndexTest.class.getName());
+public class FederatedVarTest extends AutomatedTestBase {
 
-	private final static String TEST_NAME1 = "FederatedRightIndexRightTest";
-	private final static String TEST_NAME2 = "FederatedRightIndexLeftTest";
-	private final static String TEST_NAME3 = "FederatedRightIndexFullTest";
+	private static final Log LOG = LogFactory.getLog(FederatedVarTest.class.getName());
 
+	private final static String TEST_NAME = "FederatedVarTest";
 	private final static String TEST_DIR = "functions/federated/";
-	private static final String TEST_CLASS_DIR = TEST_DIR + FederatedRightIndexTest.class.getSimpleName() + "/";
+	private static final String TEST_CLASS_DIR = TEST_DIR + FederatedVarTest.class.getSimpleName() + "/";
 
 	private final static int blocksize = 1024;
+
 	@Parameterized.Parameter()
 	public int rows;
 	@Parameterized.Parameter(1)
 	public int cols;
-
 	@Parameterized.Parameter(2)
-	public int from;
-
-	@Parameterized.Parameter(3)
-	public int to;
-
-	@Parameterized.Parameter(4)
 	public boolean rowPartitioned;
 
 	@Parameterized.Parameters
 	public static Collection<Object[]> data() {
 		return Arrays.asList(new Object[][] {
-			{20, 10, 6, 8, true},
-			{20, 10, 1, 1, true},
-			{20, 10, 2, 10, true},
-			// {20, 10, 2, 10, true},
-			// {20, 12, 2, 10, false}, {20, 12, 1, 4, false}
+			// {10, 1000, false},
+			{100, 4, false},
+			// {36, 1000, true},
+			{1000, 10, true},
+			// {4, 100, true}
+			// {1600, 8, false},
 		});
 	}
 
-	private enum IndexType {
-		RIGHT, LEFT, FULL
-	}
-
 	@Override
 	public void setUp() {
 		TestUtils.clearAssertionInformation();
-		addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {"S"}));
-		addTestConfiguration(TEST_NAME2, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] {"S"}));
-		addTestConfiguration(TEST_NAME3, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME3, new String[] {"S"}));
+		addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"S.scalar"}));
 	}
 
 	@Test
-	public void testRightIndexRightDenseMatrixCP() {
-		runAggregateOperationTest(IndexType.RIGHT, ExecMode.SINGLE_NODE);
+	public void testVarCP() {
+		runAggregateOperationTest(ExecMode.SINGLE_NODE);
 	}
 
-	@Test
-	public void testRightIndexLeftDenseMatrixCP() {
-		runAggregateOperationTest(IndexType.LEFT, ExecMode.SINGLE_NODE);
-	}
-
-	@Test
-	public void testRightIndexFullDenseMatrixCP() {
-		runAggregateOperationTest(IndexType.FULL, ExecMode.SINGLE_NODE);
-	}
-
-	private void runAggregateOperationTest(IndexType type, ExecMode execMode) {
+	private void runAggregateOperationTest(ExecMode execMode) {
 		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
 		ExecMode platformOld = rtplatform;
 
 		if(rtplatform == ExecMode.SPARK)
 			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
 
-		String TEST_NAME = null;
-		switch(type) {
-			case RIGHT:
-				TEST_NAME = TEST_NAME1;
-				break;
-			case LEFT:
-				TEST_NAME = TEST_NAME2;
-				break;
-			case FULL:
-				TEST_NAME = TEST_NAME3;
-				break;
-		}
-
 		getAndLoadTestConfiguration(TEST_NAME);
 		String HOME = SCRIPT_DIR + TEST_DIR;
 
@@ -153,35 +119,34 @@ public class FederatedRightIndexTest extends AutomatedTestBase {
 		Thread t4 = startLocalFedWorkerThread(port4);
 
 		rtplatform = execMode;
-		if(rtplatform == ExecMode.SPARK) {
-			System.out.println(7);
+		if(rtplatform == ExecMode.SPARK)
 			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
-		}
+
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
 		loadTestConfiguration(config);
 
 		// Run reference dml script with normal matrix
 		fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
-		programArgs = new String[] {"-args", input("X1"), input("X2"), input("X3"), input("X4"), String.valueOf(from),
-			String.valueOf(to), Boolean.toString(rowPartitioned).toUpperCase(), expected("S")};
-		// LOG.error(runTest(null));
-		runTest(null);
+		programArgs = new String[] {"-stats", "100", "-args", input("X1"), input("X2"), input("X3"), input("X4"),
+			expected("S"), Boolean.toString(rowPartitioned).toUpperCase()};
+		LOG.debug(runTest(null));
+
 		// Run actual dml script with federated matrix
 
 		fullDMLScriptName = HOME + TEST_NAME + ".dml";
-		programArgs = new String[] {"-stats", "100", "-nvargs",
+		programArgs = new String[] {"-explain", "-stats", "100", "-nvargs",
 			"in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
 			"in_X2=" + TestUtils.federatedAddress(port2, input("X2")),
 			"in_X3=" + TestUtils.federatedAddress(port3, input("X3")),
-			"in_X4=" + TestUtils.federatedAddress(port4, input("X4")), "rows=" + rows, "cols=" + cols, "from=" + from,
-			"to=" + to, "rP=" + Boolean.toString(rowPartitioned).toUpperCase(), "out_S=" + output("S")};
+			"in_X4=" + TestUtils.federatedAddress(port4, input("X4")), "rows=" + rows, "cols=" + cols,
+			"rP=" + Boolean.toString(rowPartitioned).toUpperCase(), "out_S=" + output("S")};
+
+		LOG.debug(runTest(null));
 
-		// LOG.error(runTest(null));
-		runTest(null);
 		// compare via files
-		compareResults(1e-9);
+		compareResults(0.05);
 
-		Assert.assertTrue(heavyHittersContainsString("fed_rightIndex"));
+		Assert.assertTrue(heavyHittersContainsString("fed_uavar"));
 
 		// check that federated input files are still existing
 		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedSSLTest.java b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedSSLTest.java
index 1088b68..6ec2f40 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedSSLTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedSSLTest.java
@@ -45,7 +45,7 @@ public class FederatedSSLTest extends AutomatedTestBase {
 	// This test use the same scripts as the Federated Reader tests, just with SSL enabled.
 	private final static String TEST_DIR = "functions/federated/io/";
 	private final static String TEST_NAME = "FederatedReaderTest";
-	private final static String TEST_CLASS_DIR = TEST_DIR + FederatedReaderTest.class.getSimpleName() + "/";
+	private final static String TEST_CLASS_DIR = TEST_DIR + FederatedSSLTest.class.getSimpleName() + "/";
 	private final static int blocksize = 1024;
 	private final static File TEST_CONF_FILE = new File(SCRIPT_DIR + TEST_DIR + "SSLConfig.xml");
 
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedWriterTest.java b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedWriterTest.java
index 587da82..c8a50fe 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedWriterTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedWriterTest.java
@@ -59,7 +59,7 @@ public class FederatedWriterTest extends AutomatedTestBase {
 	@Parameterized.Parameters
 	public static Collection<Object[]> data() {
 		// number of rows or cols has to be >= number of federated locations.
-		return Arrays.asList(new Object[][] {{10, 13, true, 2},});
+		return Arrays.asList(new Object[][] {{10, 13, true, 2}});
 	}
 
 	@Test
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/paramserv/FederatedParamservTest.java b/src/test/java/org/apache/sysds/test/functions/federated/paramserv/FederatedParamservTest.java
index ada7412..c4d04ea 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/paramserv/FederatedParamservTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/paramserv/FederatedParamservTest.java
@@ -24,6 +24,8 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.common.Types.ExecMode;
 import org.apache.sysds.runtime.meta.MatrixCharacteristics;
 import org.apache.sysds.test.AutomatedTestBase;
@@ -39,6 +41,7 @@ import org.junit.runners.Parameterized;
 @RunWith(value = Parameterized.class)
 @net.jcip.annotations.NotThreadSafe
 public class FederatedParamservTest extends AutomatedTestBase {
+	private static final Log LOG = LogFactory.getLog(FederatedParamservTest.class.getName());
 	private final static String TEST_DIR = "functions/federated/paramserv/";
 	private final static String TEST_NAME = "FederatedParamservTest";
 	private final static String TEST_CLASS_DIR = TEST_DIR + FederatedParamservTest.class.getSimpleName() + "/";
@@ -159,9 +162,7 @@ public class FederatedParamservTest extends AutomatedTestBase {
 			}
 	
 			programArgs = programArgsList.toArray(new String[0]);
-			// ByteArrayOutputStream stdout =
-			runTest(null);
-			// System.out.print(stdout.toString());
+			LOG.debug(runTest(null));
 			Assert.assertEquals(0, Statistics.getNoOfExecutedSPInst());
 			
 			// cleanup
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryMatrixTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryMatrixTest.java
index efd98c2..11f2bd4 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryMatrixTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryMatrixTest.java
@@ -56,8 +56,10 @@ public class FederatedBinaryMatrixTest extends AutomatedTestBase {
 	public static Collection<Object[]> data() {
 		// rows have to be even and > 1
 		return Arrays.asList(new Object[][] {
-            {2, 1000}, 
-            {10, 100}, {100, 10}, {1000, 1}, {10, 2000}, {2000, 10}
+            // {2, 1000}, 
+			{10, 100}, 
+			// {100, 10}, {1000, 1}, 
+			// {10, 2000}, {2000, 10}
         });
 	}
 
@@ -66,12 +68,6 @@ public class FederatedBinaryMatrixTest extends AutomatedTestBase {
 		federatedMultiply(Types.ExecMode.SINGLE_NODE);
 	}
 
-	/*
-	 * FIXME spark execution mode support
-	 * 
-	 * @Test public void federatedMultiplySP() { federatedMultiply(Types.ExecMode.SPARK); }
-	 */
-
 	public void federatedMultiply(Types.ExecMode execMode) {
 		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
 		Types.ExecMode platformOld = rtplatform;
@@ -109,7 +105,7 @@ public class FederatedBinaryMatrixTest extends AutomatedTestBase {
 		fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
 		programArgs = new String[] {"-nvargs", "X1=" + input("X1"), "X2=" + input("X2"), "Y1=" + input("Y1"),
 			"Y2=" + input("Y2"), "Z=" + expected("Z")};
-		runTest(true, false, null, -1);
+		runTest(null);
 
 		// Run actual dml script with federated matrix
 		fullDMLScriptName = HOME + TEST_NAME + ".dml";
@@ -117,7 +113,7 @@ public class FederatedBinaryMatrixTest extends AutomatedTestBase {
 			"X2=" + TestUtils.federatedAddress(port2, input("X2")),
 			"Y1=" + TestUtils.federatedAddress(port1, input("Y1")),
 			"Y2=" + TestUtils.federatedAddress(port2, input("Y2")), "r=" + rows, "c=" + cols, "Z=" + output("Z")};
-		runTest(true, false, null, -1);
+		runTest(null);
 
 		// compare via files
 		compareResults(1e-9);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryVectorTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryVectorTest.java
index c0a53d4..f11b7be 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryVectorTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryVectorTest.java
@@ -57,8 +57,10 @@ public class FederatedBinaryVectorTest extends AutomatedTestBase {
 	public static Collection<Object[]> data() {
 		// rows have to be even and > 1
 		return Arrays.asList(new Object[][] {
-            {2, 1000}, 
-            {10, 100}, {100, 10}, {1000, 1}, {10, 2000}, {2000, 10}
+            // {2, 1000}, 
+			// {10, 100}, 
+			{100, 10}, 
+			// {1000, 1}, {10, 2000}, {2000, 10}
         });
 	}
 
@@ -67,12 +69,6 @@ public class FederatedBinaryVectorTest extends AutomatedTestBase {
 		federatedMultiply(Types.ExecMode.SINGLE_NODE);
 	}
 
-	/*
-	 * FIXME spark execution mode support
-	 * 
-	 * @Test public void federatedMultiplySP() { federatedMultiply(Types.ExecMode.SPARK); }
-	 */
-
 	public void federatedMultiply(Types.ExecMode execMode) {
 		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
 		Types.ExecMode platformOld = rtplatform;
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederetedCastToFrameTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedCastToFrameTest.java
similarity index 95%
rename from src/test/java/org/apache/sysds/test/functions/federated/primitives/FederetedCastToFrameTest.java
rename to src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedCastToFrameTest.java
index 5e05bf5..b67cc93 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederetedCastToFrameTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedCastToFrameTest.java
@@ -38,12 +38,12 @@ import org.junit.runners.Parameterized;
 
 @RunWith(value = Parameterized.class)
 @net.jcip.annotations.NotThreadSafe
-public class FederetedCastToFrameTest extends AutomatedTestBase {
-	private static final Log LOG = LogFactory.getLog(FederetedCastToFrameTest.class.getName());
+public class FederatedCastToFrameTest extends AutomatedTestBase {
+	private static final Log LOG = LogFactory.getLog(FederatedCastToFrameTest.class.getName());
 
 	private final static String TEST_DIR = "functions/federated/primitives/";
 	private final static String TEST_NAME = "FederatedCastToFrameTest";
-	private final static String TEST_CLASS_DIR = TEST_DIR + FederetedCastToFrameTest.class.getSimpleName() + "/";
+	private final static String TEST_CLASS_DIR = TEST_DIR + FederatedCastToFrameTest.class.getSimpleName() + "/";
 
 	private final static int blocksize = 1024;
 	@Parameterized.Parameter()
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederetedCastToMatrixTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedCastToMatrixTest.java
similarity index 96%
rename from src/test/java/org/apache/sysds/test/functions/federated/primitives/FederetedCastToMatrixTest.java
rename to src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedCastToMatrixTest.java
index b075e47..57ffacf 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederetedCastToMatrixTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedCastToMatrixTest.java
@@ -48,12 +48,12 @@ import org.junit.runners.Parameterized;
 
 @RunWith(value = Parameterized.class)
 @net.jcip.annotations.NotThreadSafe
-public class FederetedCastToMatrixTest extends AutomatedTestBase {
-	private static final Log LOG = LogFactory.getLog(FederetedCastToMatrixTest.class.getName());
+public class FederatedCastToMatrixTest extends AutomatedTestBase {
+	private static final Log LOG = LogFactory.getLog(FederatedCastToMatrixTest.class.getName());
 
 	private final static String TEST_DIR = "functions/federated/primitives/";
 	private final static String TEST_NAME = "FederatedCastToMatrixTest";
-	private final static String TEST_CLASS_DIR = TEST_DIR + FederetedCastToMatrixTest.class.getSimpleName() + "/";
+	private final static String TEST_CLASS_DIR = TEST_DIR + FederatedCastToMatrixTest.class.getSimpleName() + "/";
 
 	@Parameterized.Parameter()
 	public int rows;
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedCentralMomentTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedCentralMomentTest.java
new file mode 100644
index 0000000..4d644ce
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedCentralMomentTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.federated.primitives;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
+@net.jcip.annotations.NotThreadSafe
+public class FederatedCentralMomentTest extends AutomatedTestBase {
+
+    private final static String TEST_DIR = "functions/federated/";
+    private final static String TEST_NAME = "FederatedCentralMomentTest";
+    private final static String TEST_CLASS_DIR = TEST_DIR + FederatedCentralMomentTest.class.getSimpleName() + "/";
+
+    private final static int blocksize = 1024;
+    @Parameterized.Parameter()
+    public int rows;
+
+    @Parameterized.Parameter(1)
+    public int k;
+
+    @Parameterized.Parameters
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                {1000, 2},
+                {1000, 3}, 
+                {1000, 4}
+        });
+    }
+
+    @Override
+    public void setUp() {
+        TestUtils.clearAssertionInformation();
+        addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"S.scalar"}));
+    }
+
+    @Test
+    public void federatedCentralMomentCP() { federatedCentralMoment(Types.ExecMode.SINGLE_NODE); }
+
+    @Test
+    @Ignore
+    public void federatedCentralMomentSP() { federatedCentralMoment(Types.ExecMode.SPARK); }
+
+    public void federatedCentralMoment(Types.ExecMode execMode) {
+        boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+        Types.ExecMode platformOld = rtplatform;
+
+        getAndLoadTestConfiguration(TEST_NAME);
+        String HOME = SCRIPT_DIR + TEST_DIR;
+
+        int r = rows / 4;
+
+        double[][] X1 = getRandomMatrix(r, 1, 1, 5, 1, 3);
+        double[][] X2 = getRandomMatrix(r, 1, 1, 5, 1, 7);
+        double[][] X3 = getRandomMatrix(r, 1, 1, 5, 1, 8);
+        double[][] X4 = getRandomMatrix(r, 1, 1, 5, 1, 9);
+
+        MatrixCharacteristics mc = new MatrixCharacteristics(r, 1, blocksize, r);
+        writeInputMatrixWithMTD("X1", X1, false, mc);
+        writeInputMatrixWithMTD("X2", X2, false, mc);
+        writeInputMatrixWithMTD("X3", X3, false, mc);
+        writeInputMatrixWithMTD("X4", X4, false, mc);
+
+        // empty script name because we don't execute any script, just start the worker
+        fullDMLScriptName = "";
+        int port1 = getRandomAvailablePort();
+        int port2 = getRandomAvailablePort();
+        int port3 = getRandomAvailablePort();
+        int port4 = getRandomAvailablePort();
+        Thread t1 = startLocalFedWorkerThread(port1);
+        Thread t2 = startLocalFedWorkerThread(port2);
+        Thread t3 = startLocalFedWorkerThread(port3);
+        Thread t4 = startLocalFedWorkerThread(port4);
+
+        // reference file should not be written to hdfs, so we set platform here
+        rtplatform = execMode;
+        if(rtplatform == Types.ExecMode.SPARK) {
+            DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+        }
+        // Run reference dml script with normal matrix for Row/Col
+        fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
+        programArgs = new String[] {"-stats", "100", "-args", 
+            input("X1"), input("X2"), input("X3"), input("X4"), expected("S"), String.valueOf(k)};
+        runTest(null);
+
+        TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
+        loadTestConfiguration(config);
+
+        fullDMLScriptName = HOME + TEST_NAME + ".dml";
+        programArgs = new String[] {"-stats", "100", "-nvargs",
+            "in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
+            "in_X2=" + TestUtils.federatedAddress(port2, input("X2")),
+            "in_X3=" + TestUtils.federatedAddress(port3, input("X3")),
+            "in_X4=" + TestUtils.federatedAddress(port4, input("X4")),
+            "rows=" + rows,
+            "cols=" + 1,
+            "out_S=" + output("S"),
+            "k=" + k};
+        runTest(null);
+
+        // compare all sums via files
+        compareResults(0.01);
+
+        Assert.assertTrue(heavyHittersContainsString("fed_cm"));
+
+        // check that federated input files are still existing
+        Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
+        Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X2")));
+        Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X3")));
+        Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X4")));
+
+        TestUtils.shutdownThreads(t1, t2, t3, t4);
+        rtplatform = platformOld;
+        DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+    }
+}
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedConstructionTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedConstructionTest.java
index 8a3b4d1..edfd1b4 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedConstructionTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedConstructionTest.java
@@ -19,6 +19,13 @@
 
 package org.apache.sysds.test.functions.federated.primitives;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
 import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types;
 import org.apache.sysds.common.Types.FileFormat;
@@ -26,17 +33,11 @@ import org.apache.sysds.runtime.meta.MatrixCharacteristics;
 import org.apache.sysds.test.AutomatedTestBase;
 import org.apache.sysds.test.TestConfiguration;
 import org.apache.sysds.test.TestUtils;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
 @RunWith(value = Parameterized.class)
 @net.jcip.annotations.NotThreadSafe
 public class FederatedConstructionTest extends AutomatedTestBase {
@@ -56,7 +57,10 @@ public class FederatedConstructionTest extends AutomatedTestBase {
 	@Parameterized.Parameters
 	public static Collection<Object[]> data() {
 		// cols have to be dividable by 4 for Frame tests
-		return Arrays.asList(new Object[][] {{1, 1024}, {8, 256}, {256, 8}, {1024, 4}, {16, 2048}, {2048, 32}});
+		return Arrays.asList(new Object[][] {
+			// {1, 1024}, {8, 256}, {256, 8}, {1024, 4}, {16, 2048},
+			 {2048, 32}
+		});
 	}
 
 	@Override
@@ -71,6 +75,7 @@ public class FederatedConstructionTest extends AutomatedTestBase {
 	}
 
 	@Test
+	@Ignore
 	public void federatedMatrixConstructionSP() {
 		federatedMatrixConstruction(Types.ExecMode.SPARK);
 	}
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedFullAggregateTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedFullAggregateTest.java
index 00f0f50..1617ab6 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedFullAggregateTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedFullAggregateTest.java
@@ -31,6 +31,7 @@ import org.apache.sysds.test.AutomatedTestBase;
 import org.apache.sysds.test.TestConfiguration;
 import org.apache.sysds.test.TestUtils;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -57,7 +58,13 @@ public class FederatedFullAggregateTest extends AutomatedTestBase {
 	@Parameterized.Parameters
 	public static Collection<Object[]> data() {
 		return Arrays.asList(
-			new Object[][] {{10, 1000, false}, {100, 4, false}, {36, 1000, true}, {1000, 10, true}, {4, 100, true}});
+			new Object[][] {
+				// {10, 1000, false}, 
+				{100, 4, false}, 
+				// {36, 1000, true}, 
+				// {1000, 10, true}, 
+				{4, 100, true}
+			});
 	}
 
 	private enum OpType {
@@ -94,21 +101,25 @@ public class FederatedFullAggregateTest extends AutomatedTestBase {
 	}
 
 	@Test
+	@Ignore
 	public void testSumDenseMatrixSP() {
 		runColAggregateOperationTest(OpType.SUM, ExecType.SPARK);
 	}
 
 	@Test
+	@Ignore
 	public void testMeanDenseMatrixSP() {
 		runColAggregateOperationTest(OpType.MEAN, ExecType.SPARK);
 	}
 
 	@Test
+	@Ignore
 	public void testMaxDenseMatrixSP() {
 		runColAggregateOperationTest(OpType.MAX, ExecType.SPARK);
 	}
 
 	@Test
+	@Ignore
 	public void testMinDenseMatrixSP() {
 		runColAggregateOperationTest(OpType.MIN, ExecType.SPARK);
 	}
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedMatrixScalarOperationsTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedMatrixScalarOperationsTest.java
index becb246..bd19ee8 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedMatrixScalarOperationsTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedMatrixScalarOperationsTest.java
@@ -37,7 +37,10 @@ import static java.lang.Thread.sleep;
 public class FederatedMatrixScalarOperationsTest extends AutomatedTestBase {
 	@Parameterized.Parameters
 	public static Iterable<Object[]> data() {
-		return Arrays.asList(new Object[][] {{100, 100}, {10000, 100},});
+		return Arrays.asList(new Object[][] {
+			{100, 100}, 
+		// {10000, 100}
+		});
 	}
 
 	// internals 4 parameterized tests
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedMultiplyTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedMultiplyTest.java
index 3220e1a..4170914 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedMultiplyTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedMultiplyTest.java
@@ -56,7 +56,11 @@ public class FederatedMultiplyTest extends AutomatedTestBase {
 	@Parameterized.Parameters
 	public static Collection<Object[]> data() {
 		// rows have to be even and > 1
-		return Arrays.asList(new Object[][] {{2, 1000}, {10, 100}, {100, 10}, {1000, 1}, {10, 2000}, {2000, 10}});
+		return Arrays.asList(new Object[][] {
+			// {2, 1000}, {10, 100},
+			{100, 10}, {1000, 1},
+			// {10, 2000}, {2000, 10}
+		});
 	}
 
 	@Test
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedQuantileTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedQuantileTest.java
new file mode 100644
index 0000000..1a71279
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedQuantileTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.federated.primitives;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
+@net.jcip.annotations.NotThreadSafe
+public class FederatedQuantileTest extends AutomatedTestBase {
+
+	private final static String TEST_DIR = "functions/federated/quantile/";
+	private final static String TEST_NAME1 = "FederatedQuantileTest";
+	private final static String TEST_NAME2 = "FederatedMedianTest";
+	private final static String TEST_NAME3 = "FederatedIQMTest";
+	private final static String TEST_NAME4 = "FederatedQuantilesTest";
+	private final static String TEST_CLASS_DIR = TEST_DIR + FederatedQuantileTest.class.getSimpleName() + "/";
+
+	private final static int blocksize = 1024;
+	@Parameterized.Parameter()
+	public int rows;
+
+	@Parameterized.Parameters
+	public static Collection<Object[]> data() {
+		return Arrays.asList(new Object[][] {{1000}});
+	}
+
+	@Override
+	public void setUp() {
+		TestUtils.clearAssertionInformation();
+		addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {"S.scalar"}));
+		addTestConfiguration(TEST_NAME2, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] {"S.scalar"}));
+		addTestConfiguration(TEST_NAME3, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME3, new String[] {"S.scalar"}));
+		addTestConfiguration(TEST_NAME4, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME4, new String[] {"S"}));
+	}
+
+	@Test
+	public void federatedQuantile1CP() { federatedQuartile(Types.ExecMode.SINGLE_NODE, TEST_NAME1, 0.25); }
+
+	@Test
+	public void federatedQuantile2CP() { federatedQuartile(Types.ExecMode.SINGLE_NODE, TEST_NAME1, 0.5); }
+
+	@Test
+	public void federatedQuantile3CP() { federatedQuartile(Types.ExecMode.SINGLE_NODE, TEST_NAME1, 0.75); }
+
+	@Test
+	public void federatedMedianCP() { federatedQuartile(Types.ExecMode.SINGLE_NODE, TEST_NAME2, -1); }
+
+	@Test
+	public void federatedIQMCP() { federatedQuartile(Types.ExecMode.SINGLE_NODE, TEST_NAME1, -1); }
+
+	@Test
+	public void federatedQuantilesCP() { federatedQuartile(Types.ExecMode.SINGLE_NODE, TEST_NAME1, -1); }
+
+	@Test
+	@Ignore
+	public void federatedQuantile1SP() { federatedQuartile(Types.ExecMode.SPARK, TEST_NAME1, 0.25); }
+
+	@Test
+	@Ignore
+	public void federatedQuantile2SP() { federatedQuartile(Types.ExecMode.SPARK, TEST_NAME1, 0.5); }
+
+	@Test
+	@Ignore
+	public void federatedQuantile3SP() { federatedQuartile(Types.ExecMode.SPARK, TEST_NAME1, 0.75); }
+
+	@Test
+	@Ignore
+	public void federatedMedianSP() { federatedQuartile(Types.ExecMode.SPARK, TEST_NAME2, -1); }
+
+	@Test
+	@Ignore
+	public void federatedIQMSP() { federatedQuartile(Types.ExecMode.SPARK, TEST_NAME1, -1); }
+
+	@Test
+	@Ignore
+	public void federatedQuantilesSP() { federatedQuartile(Types.ExecMode.SPARK, TEST_NAME1, -1); }
+
+
+
+
+
+	public void federatedQuartile(Types.ExecMode execMode, String TEST_NAME, double p) {
+		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+		Types.ExecMode platformOld = rtplatform;
+
+		getAndLoadTestConfiguration(TEST_NAME);
+		String HOME = SCRIPT_DIR + TEST_DIR;
+
+		double[][] X1 = getRandomMatrix(rows, 1, 1, 5, 1, 3);
+
+		MatrixCharacteristics mc = new MatrixCharacteristics(rows, 1, blocksize, rows);
+		writeInputMatrixWithMTD("X1", X1, false, mc);
+
+		// empty script name because we don't execute any script, just start the worker
+		fullDMLScriptName = "";
+		int port1 = getRandomAvailablePort();
+		Thread t1 = startLocalFedWorkerThread(port1);
+
+		// we need the reference file to not be written to hdfs, so we get the correct format
+		rtplatform = Types.ExecMode.SINGLE_NODE;
+		// Run reference dml script with normal matrix for Row/Col
+		fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
+		programArgs = new String[] {"-explain", "-stats", "100", "-args", input("X1"), expected("S"), String.valueOf(p)};
+		runTest(true, false, null, -1);
+
+		// reference file should not be written to hdfs, so we set platform here
+		rtplatform = execMode;
+		if(rtplatform == Types.ExecMode.SPARK) {
+			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+		}
+		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
+		loadTestConfiguration(config);
+
+		fullDMLScriptName = HOME + TEST_NAME + ".dml";
+		programArgs = new String[] {"-explain", "-stats", "100", "-nvargs",
+			"in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
+			"rows=" + rows, "cols=" + 1, "p=" + String.valueOf(p),
+			"out_S=" + output("S")
+		};
+		runTest(true, false, null, -1);
+
+		// compare all sums via files
+		compareResults(1e-9);
+		Assert.assertTrue(heavyHittersContainsString("fed_qsort"));
+		Assert.assertTrue(heavyHittersContainsString("fed_qpick"));
+
+		// check that federated input files are still existing
+		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
+
+		TestUtils.shutdownThreads(t1);
+		rtplatform = platformOld;
+		DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+	}
+}
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedQuantileWeightsTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedQuantileWeightsTest.java
new file mode 100644
index 0000000..4511c6d
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedQuantileWeightsTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.federated.primitives;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
+@net.jcip.annotations.NotThreadSafe
+public class FederatedQuantileWeightsTest extends AutomatedTestBase {
+
+	private final static String TEST_DIR = "functions/federated/quantile/";
+	private final static String TEST_NAME1 = "FederatedQuantileWeightsTest";
+	private final static String TEST_NAME2 = "FederatedMedianWeightsTest";
+	private final static String TEST_NAME3 = "FederatedIQMWeightsTest";
+	private final static String TEST_NAME4 = "FederatedQuantilesWeightsTest";
+	private final static String TEST_CLASS_DIR = TEST_DIR + FederatedQuantileWeightsTest.class.getSimpleName() + "/";
+
+	private final static int blocksize = 1024;
+	@Parameterized.Parameter()
+	public int rows;
+
+	@Parameterized.Parameters
+	public static Collection<Object[]> data() {
+		return Arrays.asList(new Object[][] {{1000}});
+	}
+
+	@Override
+	public void setUp() {
+		TestUtils.clearAssertionInformation();
+		addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {"S.scalar"}));
+		addTestConfiguration(TEST_NAME2, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] {"S.scalar"}));
+		addTestConfiguration(TEST_NAME3, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME3, new String[] {"S.scalar"}));
+		addTestConfiguration(TEST_NAME4, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME4, new String[] {"S"}));
+	}
+
+	@Test
+	public void federatedQuantile1CP() { federatedQuartile(Types.ExecMode.SINGLE_NODE, TEST_NAME1, 0.25); }
+
+	@Test
+	public void federatedQuantile2CP() { federatedQuartile(Types.ExecMode.SINGLE_NODE, TEST_NAME1, 0.5); }
+
+	@Test
+	public void federatedQuantile3CP() { federatedQuartile(Types.ExecMode.SINGLE_NODE, TEST_NAME1, 0.75); }
+
+	@Test
+	public void federatedMedianCP() { federatedQuartile(Types.ExecMode.SINGLE_NODE, TEST_NAME2, -1); }
+
+	@Test
+	public void federatedIQMCP() { federatedQuartile(Types.ExecMode.SINGLE_NODE, TEST_NAME1, -1); }
+
+	@Test
+	public void federatedQuantilesCP() { federatedQuartile(Types.ExecMode.SINGLE_NODE, TEST_NAME1, -1); }
+
+	public void federatedQuartile(Types.ExecMode execMode, String TEST_NAME, double p) {
+		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+		Types.ExecMode platformOld = rtplatform;
+
+		getAndLoadTestConfiguration(TEST_NAME);
+		String HOME = SCRIPT_DIR + TEST_DIR;
+
+		double[][] X1 = getRandomMatrix(rows, 1, 1, 5, 1, 3);
+
+		MatrixCharacteristics mc = new MatrixCharacteristics(rows, 1, blocksize, rows);
+		writeInputMatrixWithMTD("X1", X1, false, mc);
+
+		double[][] W = getRandomMatrix(rows, 1, 1, 1, 1.0, 1);
+		writeInputMatrixWithMTD("W", W, false);
+
+		// empty script name because we don't execute any script, just start the worker
+		fullDMLScriptName = "";
+		int port1 = getRandomAvailablePort();
+		Thread t1 = startLocalFedWorkerThread(port1);
+
+		// we need the reference file to not be written to hdfs, so we get the correct format
+		rtplatform = Types.ExecMode.SINGLE_NODE;
+		// Run reference dml script with normal matrix for Row/Col
+		fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
+		programArgs = new String[] {"-explain", "-stats", "100", "-args", input("X1"), expected("S"), String.valueOf(p), input("W")};
+		runTest(true, false, null, -1);
+
+		// reference file should not be written to hdfs, so we set platform here
+		rtplatform = execMode;
+		if(rtplatform == Types.ExecMode.SPARK) {
+			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+		}
+		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
+		loadTestConfiguration(config);
+
+		fullDMLScriptName = HOME + TEST_NAME + ".dml";
+		programArgs = new String[] {"-explain", "-stats", "100", "-nvargs",
+			"in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
+			"rows=" + rows, "cols=" + 1,
+			"p=" + String.valueOf(p), "W=" + input("W"),
+			"out_S=" + output("S")
+		};
+		runTest(true, false, null, -1);
+
+		// compare all sums via files
+		compareResults(1e-9);
+		Assert.assertTrue(heavyHittersContainsString("fed_qsort"));
+		Assert.assertTrue(heavyHittersContainsString("fed_qpick"));
+
+		// check that federated input files are still existing
+		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
+
+		TestUtils.shutdownThreads(t1);
+		rtplatform = platformOld;
+		DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+	}
+}
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRCBindTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRCBindTest.java
index ca745b9..abf37eb 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRCBindTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRCBindTest.java
@@ -19,18 +19,19 @@
 
 package org.apache.sysds.test.functions.federated.primitives;
 
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import java.util.Arrays;
+import java.util.Collection;
+
 import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types;
 import org.apache.sysds.runtime.meta.MatrixCharacteristics;
 import org.apache.sysds.test.AutomatedTestBase;
 import org.apache.sysds.test.TestConfiguration;
 import org.apache.sysds.test.TestUtils;
-import java.util.Arrays;
-import java.util.Collection;
-
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 @RunWith(value = Parameterized.class)
 @net.jcip.annotations.NotThreadSafe
 public class FederatedRCBindTest extends AutomatedTestBase {
@@ -48,7 +49,14 @@ public class FederatedRCBindTest extends AutomatedTestBase {
 	@Parameterized.Parameters
 	public static Collection<Object[]> data() {
 		//TODO add tests and support of aligned blocksized (which is however a special case)
-		return Arrays.asList(new Object[][] {{1, 1001}, {10, 100}, {100, 10}, {1001, 1}, {10, 2001}, {2001, 10}});
+		return Arrays.asList(new Object[][] {
+			// {1, 1001},
+			//  {10, 100}, 
+			{100, 10}, 
+			// {1001, 1},
+			// {10, 2001}, 
+			// {2001, 10}
+		});
 	}
 
 	@Override
@@ -67,6 +75,7 @@ public class FederatedRCBindTest extends AutomatedTestBase {
 	}
 
 	@Test
+	@Ignore
 	public void federatedRCBindSP() {
 		federatedRCBind(Types.ExecMode.SPARK);
 	}
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRightIndexTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRightIndexTest.java
index 0adcb15..d5f81e9 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRightIndexTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRightIndexTest.java
@@ -22,6 +22,8 @@ package org.apache.sysds.test.functions.federated.primitives;
 import java.util.Arrays;
 import java.util.Collection;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types.ExecMode;
 import org.apache.sysds.runtime.meta.MatrixCharacteristics;
@@ -37,7 +39,7 @@ import org.junit.runners.Parameterized;
 @RunWith(value = Parameterized.class)
 @net.jcip.annotations.NotThreadSafe
 public class FederatedRightIndexTest extends AutomatedTestBase {
-	// private static final Log LOG = LogFactory.getLog(FederatedRightIndexTest.class.getName());
+	private static final Log LOG = LogFactory.getLog(FederatedRightIndexTest.class.getName());
 
 	private final static String TEST_NAME1 = "FederatedRightIndexRightTest";
 	private final static String TEST_NAME2 = "FederatedRightIndexLeftTest";
@@ -64,11 +66,12 @@ public class FederatedRightIndexTest extends AutomatedTestBase {
 	@Parameterized.Parameters
 	public static Collection<Object[]> data() {
 		return Arrays.asList(new Object[][] {
-			{20, 10, 6, 8, true},
+			// {20, 10, 6, 8, true},
 			{20, 10, 1, 1, true},
 			{20, 10, 2, 10, true},
 			// {20, 10, 2, 10, true},
-			// {20, 12, 2, 10, false}, {20, 12, 1, 4, false}
+			// {20, 12, 2, 10, false}, 
+			// {20, 12, 1, 4, false}
 		});
 	}
 
@@ -164,8 +167,7 @@ public class FederatedRightIndexTest extends AutomatedTestBase {
 		fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
 		programArgs = new String[] {"-args", input("X1"), input("X2"), input("X3"), input("X4"), String.valueOf(from),
 			String.valueOf(to), Boolean.toString(rowPartitioned).toUpperCase(), expected("S")};
-		// LOG.error(runTest(null));
-		runTest(null);
+		LOG.debug(runTest(null));
 		// Run actual dml script with federated matrix
 
 		fullDMLScriptName = HOME + TEST_NAME + ".dml";
@@ -176,8 +178,8 @@ public class FederatedRightIndexTest extends AutomatedTestBase {
 			"in_X4=" + TestUtils.federatedAddress(port4, input("X4")), "rows=" + rows, "cols=" + cols, "from=" + from,
 			"to=" + to, "rP=" + Boolean.toString(rowPartitioned).toUpperCase(), "out_S=" + output("S")};
 
-		// LOG.error(runTest(null));
-		runTest(null);
+		LOG.debug(runTest(null));
+
 		// compare via files
 		compareResults(1e-9);
 
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRowColAggregateTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRowColAggregateTest.java
index b1c3991..7bfbeee 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRowColAggregateTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRowColAggregateTest.java
@@ -60,7 +60,7 @@ public class FederatedRowColAggregateTest extends AutomatedTestBase {
 	@Parameterized.Parameters
 	public static Collection<Object[]> data() {
 		return Arrays.asList(
-			new Object[][] {{10, 1000, false}, 
+			new Object[][] {{10, 1000, false},
 			//{100, 4, false}, {36, 1000, true}, {1000, 10, true}, {4, 100, true}
 		});
 	}
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedSplitTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedSplitTest.java
index 04f2828..9c4b6d0 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedSplitTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedSplitTest.java
@@ -30,6 +30,7 @@ import org.apache.sysds.runtime.meta.MatrixCharacteristics;
 import org.apache.sysds.test.AutomatedTestBase;
 import org.apache.sysds.test.TestConfiguration;
 import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -53,7 +54,7 @@ public class FederatedSplitTest extends AutomatedTestBase {
 
     @Parameterized.Parameters
     public static Collection<Object[]> data() {
-        return Arrays.asList(new Object[][] {{152, 12, "TRUE"},{132, 11, "FALSE"}});
+        return Arrays.asList(new Object[][] {{152, 12, "TRUE"}, {132, 11, "FALSE"}});
     }
 
     @Override
@@ -108,7 +109,7 @@ public class FederatedSplitTest extends AutomatedTestBase {
 
         // Run actual dml script with federated matrix
         fullDMLScriptName = HOME + TEST_NAME + ".dml";
-        programArgs = new String[] {"-nvargs", "X1=" + TestUtils.federatedAddress(port1, input("X1")),
+        programArgs = new String[] {"-stats", "100", "-nvargs", "X1=" + TestUtils.federatedAddress(port1, input("X1")),
             "X2=" + TestUtils.federatedAddress(port2, input("X2")),
             "Y1=" + TestUtils.federatedAddress(port1, input("Y1")),
             "Y2=" + TestUtils.federatedAddress(port2, input("Y2")), "r=" + rows, "c=" + cols, "Z=" + output("Z"),
@@ -120,6 +121,16 @@ public class FederatedSplitTest extends AutomatedTestBase {
         // compare via files
         compareResults(1e-9);
 
+        if(cont.equals("TRUE"))
+            Assert.assertTrue(heavyHittersContainsString("fed_rightIndex"));
+        else{
+
+            Assert.assertTrue(heavyHittersContainsString("fed_ba+*"));
+            // TODO add federated diag operator.
+            // Assert.assertTrue(heavyHittersContainsString("fed_rdiag"));
+
+        }
+        
         TestUtils.shutdownThreads(t1, t2);
         rtplatform = platformOld;
         DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedStatisticsTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedStatisticsTest.java
index 0eccc8d..99e649f 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedStatisticsTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedStatisticsTest.java
@@ -59,7 +59,11 @@ public class FederatedStatisticsTest extends AutomatedTestBase {
 	@Parameterized.Parameters
 	public static Collection<Object[]> data() {
 		// rows have to be even and > 1
-		return Arrays.asList(new Object[][] {{10000, 10}, {1000, 100}, {2000, 43}});
+		return Arrays.asList(new Object[][] {
+			// {10000, 10}, 
+			// {1000, 100}, 
+			{2000, 43}
+		});
 	}
 
 	@Test
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedSumTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedSumTest.java
index 9ce65eb..3d03f7b 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedSumTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedSumTest.java
@@ -19,18 +19,19 @@
 
 package org.apache.sysds.test.functions.federated.primitives;
 
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import java.util.Arrays;
+import java.util.Collection;
+
 import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types;
 import org.apache.sysds.runtime.meta.MatrixCharacteristics;
 import org.apache.sysds.test.AutomatedTestBase;
 import org.apache.sysds.test.TestConfiguration;
 import org.apache.sysds.test.TestUtils;
-
-import java.util.Arrays;
-import java.util.Collection;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 @RunWith(value = Parameterized.class)
 @net.jcip.annotations.NotThreadSafe
@@ -48,7 +49,11 @@ public class FederatedSumTest extends AutomatedTestBase {
 
 	@Parameterized.Parameters
 	public static Collection<Object[]> data() {
-		return Arrays.asList(new Object[][] {{2, 1000}, {10, 100}, {100, 10}, {1000, 1}, {10, 2000}, {2000, 10}});
+		return Arrays.asList(new Object[][] {
+			// {2, 1000}, {10, 100}, 
+			{100, 10}, {1000, 1},
+			//  {10, 2000}, {2000, 10}
+			});
 	}
 
 	@Override
@@ -64,6 +69,7 @@ public class FederatedSumTest extends AutomatedTestBase {
 	}
 
 	@Test
+	@Ignore
 	public void federatedSumSP() {
 		federatedSum(Types.ExecMode.SPARK);
 	}
diff --git a/scripts/builtin/dist.dml b/src/test/scripts/functions/federated/FederatedCentralMomentTest.dml
similarity index 70%
copy from scripts/builtin/dist.dml
copy to src/test/scripts/functions/federated/FederatedCentralMomentTest.dml
index 9c473d8..477b7b9 100644
--- a/scripts/builtin/dist.dml
+++ b/src/test/scripts/functions/federated/FederatedCentralMomentTest.dml
@@ -17,13 +17,11 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+#-------------------------------------------------------------
 
-# Returns Euclidian distance matrix (distances between N n-dimensional points)
+A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+    ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+    list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
 
-m_dist = function(Matrix[Double] X) return (Matrix[Double] Y) {
-    G = X %*% t(X);
-    I = matrix(1, rows = nrow(G), cols = ncol(G));
-    Y = -2 * (G) + t(I %*% diag(diag(G))) + t(diag(diag(G)) %*% I);
-    Y = sqrt(Y);
-    Y = replace(target = Y, pattern=0/0, replacement = 0);
-}
\ No newline at end of file
+s = moment(A, $k);
+write(s, $out_S);
diff --git a/scripts/builtin/dist.dml b/src/test/scripts/functions/federated/FederatedCentralMomentTestReference.dml
similarity index 70%
copy from scripts/builtin/dist.dml
copy to src/test/scripts/functions/federated/FederatedCentralMomentTestReference.dml
index 9c473d8..15c3a7d 100644
--- a/scripts/builtin/dist.dml
+++ b/src/test/scripts/functions/federated/FederatedCentralMomentTestReference.dml
@@ -17,13 +17,9 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+#-------------------------------------------------------------
 
-# Returns Euclidian distance matrix (distances between N n-dimensional points)
+A = rbind(read($1), read($2), read($3), read($4));
 
-m_dist = function(Matrix[Double] X) return (Matrix[Double] Y) {
-    G = X %*% t(X);
-    I = matrix(1, rows = nrow(G), cols = ncol(G));
-    Y = -2 * (G) + t(I %*% diag(diag(G))) + t(diag(diag(G)) %*% I);
-    Y = sqrt(Y);
-    Y = replace(target = Y, pattern=0/0, replacement = 0);
-}
\ No newline at end of file
+s = moment(A, $6);
+write(s, $5);
diff --git a/scripts/builtin/dist.dml b/src/test/scripts/functions/federated/FederatedCorTest.dml
similarity index 57%
copy from scripts/builtin/dist.dml
copy to src/test/scripts/functions/federated/FederatedCorTest.dml
index 9c473d8..b4d8e80 100644
--- a/scripts/builtin/dist.dml
+++ b/src/test/scripts/functions/federated/FederatedCorTest.dml
@@ -17,13 +17,18 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+#-------------------------------------------------------------
+
+if ($rP) {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+        ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+    		list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+} else {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+            ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2),
+            	list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
+}
 
-# Returns Euclidian distance matrix (distances between N n-dimensional points)
+s = cor(A);
 
-m_dist = function(Matrix[Double] X) return (Matrix[Double] Y) {
-    G = X %*% t(X);
-    I = matrix(1, rows = nrow(G), cols = ncol(G));
-    Y = -2 * (G) + t(I %*% diag(diag(G))) + t(diag(diag(G)) %*% I);
-    Y = sqrt(Y);
-    Y = replace(target = Y, pattern=0/0, replacement = 0);
-}
\ No newline at end of file
+write(s, $out_S);
diff --git a/scripts/builtin/dist.dml b/src/test/scripts/functions/federated/FederatedCorTestReference.dml
similarity index 70%
copy from scripts/builtin/dist.dml
copy to src/test/scripts/functions/federated/FederatedCorTestReference.dml
index 9c473d8..8e40e33 100644
--- a/scripts/builtin/dist.dml
+++ b/src/test/scripts/functions/federated/FederatedCorTestReference.dml
@@ -17,13 +17,10 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+#-------------------------------------------------------------
 
-# Returns Euclidian distance matrix (distances between N n-dimensional points)
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
+s = cor(A);
 
-m_dist = function(Matrix[Double] X) return (Matrix[Double] Y) {
-    G = X %*% t(X);
-    I = matrix(1, rows = nrow(G), cols = ncol(G));
-    Y = -2 * (G) + t(I %*% diag(diag(G))) + t(diag(diag(G)) %*% I);
-    Y = sqrt(Y);
-    Y = replace(target = Y, pattern=0/0, replacement = 0);
-}
\ No newline at end of file
+write(s, $5);
diff --git a/scripts/builtin/dist.dml b/src/test/scripts/functions/federated/FederatedVarTest.dml
similarity index 57%
copy from scripts/builtin/dist.dml
copy to src/test/scripts/functions/federated/FederatedVarTest.dml
index 9c473d8..88fd6a8 100644
--- a/scripts/builtin/dist.dml
+++ b/src/test/scripts/functions/federated/FederatedVarTest.dml
@@ -17,13 +17,17 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+#-------------------------------------------------------------
 
-# Returns Euclidian distance matrix (distances between N n-dimensional points)
+if ($rP) {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+        ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+    		list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+} else {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+            ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2),
+            	list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
+}
 
-m_dist = function(Matrix[Double] X) return (Matrix[Double] Y) {
-    G = X %*% t(X);
-    I = matrix(1, rows = nrow(G), cols = ncol(G));
-    Y = -2 * (G) + t(I %*% diag(diag(G))) + t(diag(diag(G)) %*% I);
-    Y = sqrt(Y);
-    Y = replace(target = Y, pattern=0/0, replacement = 0);
-}
\ No newline at end of file
+s = var(A);
+write(s, $out_S);
diff --git a/scripts/builtin/dist.dml b/src/test/scripts/functions/federated/FederatedVarTestReference.dml
similarity index 70%
copy from scripts/builtin/dist.dml
copy to src/test/scripts/functions/federated/FederatedVarTestReference.dml
index 9c473d8..36929c9 100644
--- a/scripts/builtin/dist.dml
+++ b/src/test/scripts/functions/federated/FederatedVarTestReference.dml
@@ -17,13 +17,11 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+#-------------------------------------------------------------
+
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
 
-# Returns Euclidian distance matrix (distances between N n-dimensional points)
+s = var(A);
 
-m_dist = function(Matrix[Double] X) return (Matrix[Double] Y) {
-    G = X %*% t(X);
-    I = matrix(1, rows = nrow(G), cols = ncol(G));
-    Y = -2 * (G) + t(I %*% diag(diag(G))) + t(diag(diag(G)) %*% I);
-    Y = sqrt(Y);
-    Y = replace(target = Y, pattern=0/0, replacement = 0);
-}
\ No newline at end of file
+write(s, $5);
diff --git a/scripts/builtin/dist.dml b/src/test/scripts/functions/federated/quantile/FederatedIQRTest.dml
similarity index 70%
copy from scripts/builtin/dist.dml
copy to src/test/scripts/functions/federated/quantile/FederatedIQRTest.dml
index 9c473d8..f50e72e 100644
--- a/scripts/builtin/dist.dml
+++ b/src/test/scripts/functions/federated/quantile/FederatedIQRTest.dml
@@ -17,13 +17,8 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+#-------------------------------------------------------------
 
-# Returns Euclidian distance matrix (distances between N n-dimensional points)
-
-m_dist = function(Matrix[Double] X) return (Matrix[Double] Y) {
-    G = X %*% t(X);
-    I = matrix(1, rows = nrow(G), cols = ncol(G));
-    Y = -2 * (G) + t(I %*% diag(diag(G))) + t(diag(diag(G)) %*% I);
-    Y = sqrt(Y);
-    Y = replace(target = Y, pattern=0/0, replacement = 0);
-}
\ No newline at end of file
+A = federated(addresses=list($in_X1), ranges=list(list(0, 0), list($rows, $cols)));
+s = interQuartileMean(A);
+write(s, $out_S);
diff --git a/scripts/builtin/dist.dml b/src/test/scripts/functions/federated/quantile/FederatedIQRTestReference.dml
similarity index 70%
copy from scripts/builtin/dist.dml
copy to src/test/scripts/functions/federated/quantile/FederatedIQRTestReference.dml
index 9c473d8..84fcace 100644
--- a/scripts/builtin/dist.dml
+++ b/src/test/scripts/functions/federated/quantile/FederatedIQRTestReference.dml
@@ -17,13 +17,8 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+#-------------------------------------------------------------
 
-# Returns Euclidian distance matrix (distances between N n-dimensional points)
-
-m_dist = function(Matrix[Double] X) return (Matrix[Double] Y) {
-    G = X %*% t(X);
-    I = matrix(1, rows = nrow(G), cols = ncol(G));
-    Y = -2 * (G) + t(I %*% diag(diag(G))) + t(diag(diag(G)) %*% I);
-    Y = sqrt(Y);
-    Y = replace(target = Y, pattern=0/0, replacement = 0);
-}
\ No newline at end of file
+A = read($1);
+s = interQuartileMean(A);
+write(s, $2);
diff --git a/scripts/builtin/dist.dml b/src/test/scripts/functions/federated/quantile/FederatedIQRWeightsTest.dml
similarity index 70%
copy from scripts/builtin/dist.dml
copy to src/test/scripts/functions/federated/quantile/FederatedIQRWeightsTest.dml
index 9c473d8..99b48c6 100644
--- a/scripts/builtin/dist.dml
+++ b/src/test/scripts/functions/federated/quantile/FederatedIQRWeightsTest.dml
@@ -17,13 +17,9 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+#-------------------------------------------------------------
 
-# Returns Euclidian distance matrix (distances between N n-dimensional points)
-
-m_dist = function(Matrix[Double] X) return (Matrix[Double] Y) {
-    G = X %*% t(X);
-    I = matrix(1, rows = nrow(G), cols = ncol(G));
-    Y = -2 * (G) + t(I %*% diag(diag(G))) + t(diag(diag(G)) %*% I);
-    Y = sqrt(Y);
-    Y = replace(target = Y, pattern=0/0, replacement = 0);
-}
\ No newline at end of file
+A = federated(addresses=list($in_X1), ranges=list(list(0, 0), list($rows, $cols)));
+W = read($W);
+s = interQuartileMean(A, W);
+write(s, $out_S);
diff --git a/scripts/builtin/dist.dml b/src/test/scripts/functions/federated/quantile/FederatedIQRWeightsTestReference.dml
similarity index 70%
copy from scripts/builtin/dist.dml
copy to src/test/scripts/functions/federated/quantile/FederatedIQRWeightsTestReference.dml
index 9c473d8..afc9a1f 100644
--- a/scripts/builtin/dist.dml
+++ b/src/test/scripts/functions/federated/quantile/FederatedIQRWeightsTestReference.dml
@@ -17,13 +17,9 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+#-------------------------------------------------------------
 
-# Returns Euclidian distance matrix (distances between N n-dimensional points)
-
-m_dist = function(Matrix[Double] X) return (Matrix[Double] Y) {
-    G = X %*% t(X);
-    I = matrix(1, rows = nrow(G), cols = ncol(G));
-    Y = -2 * (G) + t(I %*% diag(diag(G))) + t(diag(diag(G)) %*% I);
-    Y = sqrt(Y);
-    Y = replace(target = Y, pattern=0/0, replacement = 0);
-}
\ No newline at end of file
+A = read($1);
+W = read($4);
+s = interQuartileMean(A, W);
+write(s, $2);
diff --git a/scripts/builtin/dist.dml b/src/test/scripts/functions/federated/quantile/FederatedMedianTest.dml
similarity index 70%
copy from scripts/builtin/dist.dml
copy to src/test/scripts/functions/federated/quantile/FederatedMedianTest.dml
index 9c473d8..22f2504 100644
--- a/scripts/builtin/dist.dml
+++ b/src/test/scripts/functions/federated/quantile/FederatedMedianTest.dml
@@ -17,13 +17,8 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+#-------------------------------------------------------------
 
-# Returns Euclidian distance matrix (distances between N n-dimensional points)
-
-m_dist = function(Matrix[Double] X) return (Matrix[Double] Y) {
-    G = X %*% t(X);
-    I = matrix(1, rows = nrow(G), cols = ncol(G));
-    Y = -2 * (G) + t(I %*% diag(diag(G))) + t(diag(diag(G)) %*% I);
-    Y = sqrt(Y);
-    Y = replace(target = Y, pattern=0/0, replacement = 0);
-}
\ No newline at end of file
+A = federated(addresses=list($in_X1), ranges=list(list(0, 0), list($rows, $cols)));
+s = median(A);
+write(s, $out_S);
diff --git a/scripts/builtin/dist.dml b/src/test/scripts/functions/federated/quantile/FederatedMedianTestReference.dml
similarity index 70%
copy from scripts/builtin/dist.dml
copy to src/test/scripts/functions/federated/quantile/FederatedMedianTestReference.dml
index 9c473d8..1544987 100644
--- a/scripts/builtin/dist.dml
+++ b/src/test/scripts/functions/federated/quantile/FederatedMedianTestReference.dml
@@ -17,13 +17,8 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+#-------------------------------------------------------------
 
-# Returns Euclidian distance matrix (distances between N n-dimensional points)
-
-m_dist = function(Matrix[Double] X) return (Matrix[Double] Y) {
-    G = X %*% t(X);
-    I = matrix(1, rows = nrow(G), cols = ncol(G));
-    Y = -2 * (G) + t(I %*% diag(diag(G))) + t(diag(diag(G)) %*% I);
-    Y = sqrt(Y);
-    Y = replace(target = Y, pattern=0/0, replacement = 0);
-}
\ No newline at end of file
+A = read($1);
+s = median(A);
+write(s, $2);
diff --git a/scripts/builtin/dist.dml b/src/test/scripts/functions/federated/quantile/FederatedMedianWeightsTest.dml
similarity index 70%
copy from scripts/builtin/dist.dml
copy to src/test/scripts/functions/federated/quantile/FederatedMedianWeightsTest.dml
index 9c473d8..58ec328 100644
--- a/scripts/builtin/dist.dml
+++ b/src/test/scripts/functions/federated/quantile/FederatedMedianWeightsTest.dml
@@ -17,13 +17,9 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+#-------------------------------------------------------------
 
-# Returns Euclidian distance matrix (distances between N n-dimensional points)
-
-m_dist = function(Matrix[Double] X) return (Matrix[Double] Y) {
-    G = X %*% t(X);
-    I = matrix(1, rows = nrow(G), cols = ncol(G));
-    Y = -2 * (G) + t(I %*% diag(diag(G))) + t(diag(diag(G)) %*% I);
-    Y = sqrt(Y);
-    Y = replace(target = Y, pattern=0/0, replacement = 0);
-}
\ No newline at end of file
+A = federated(addresses=list($in_X1), ranges=list(list(0, 0), list($rows, $cols)));
+W = read($W);
+s = median(A, W);
+write(s, $out_S);
diff --git a/scripts/builtin/dist.dml b/src/test/scripts/functions/federated/quantile/FederatedMedianWeightsTestReference.dml
similarity index 70%
copy from scripts/builtin/dist.dml
copy to src/test/scripts/functions/federated/quantile/FederatedMedianWeightsTestReference.dml
index 9c473d8..6b9e3de 100644
--- a/scripts/builtin/dist.dml
+++ b/src/test/scripts/functions/federated/quantile/FederatedMedianWeightsTestReference.dml
@@ -17,13 +17,9 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+#-------------------------------------------------------------
 
-# Returns Euclidian distance matrix (distances between N n-dimensional points)
-
-m_dist = function(Matrix[Double] X) return (Matrix[Double] Y) {
-    G = X %*% t(X);
-    I = matrix(1, rows = nrow(G), cols = ncol(G));
-    Y = -2 * (G) + t(I %*% diag(diag(G))) + t(diag(diag(G)) %*% I);
-    Y = sqrt(Y);
-    Y = replace(target = Y, pattern=0/0, replacement = 0);
-}
\ No newline at end of file
+A = read($1);
+W = read($4);
+s = median(A, W);
+write(s, $2);
diff --git a/scripts/builtin/dist.dml b/src/test/scripts/functions/federated/quantile/FederatedQuantileTest.dml
similarity index 70%
copy from scripts/builtin/dist.dml
copy to src/test/scripts/functions/federated/quantile/FederatedQuantileTest.dml
index 9c473d8..1c84330 100644
--- a/scripts/builtin/dist.dml
+++ b/src/test/scripts/functions/federated/quantile/FederatedQuantileTest.dml
@@ -17,13 +17,8 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+#-------------------------------------------------------------
 
-# Returns Euclidian distance matrix (distances between N n-dimensional points)
-
-m_dist = function(Matrix[Double] X) return (Matrix[Double] Y) {
-    G = X %*% t(X);
-    I = matrix(1, rows = nrow(G), cols = ncol(G));
-    Y = -2 * (G) + t(I %*% diag(diag(G))) + t(diag(diag(G)) %*% I);
-    Y = sqrt(Y);
-    Y = replace(target = Y, pattern=0/0, replacement = 0);
-}
\ No newline at end of file
+A = federated(addresses=list($in_X1), ranges=list(list(0, 0), list($rows, $cols)));
+s = quantile(A, $p);
+write(s, $out_S);
diff --git a/scripts/builtin/dist.dml b/src/test/scripts/functions/federated/quantile/FederatedQuantileTestReference.dml
similarity index 70%
copy from scripts/builtin/dist.dml
copy to src/test/scripts/functions/federated/quantile/FederatedQuantileTestReference.dml
index 9c473d8..7a1fb36 100644
--- a/scripts/builtin/dist.dml
+++ b/src/test/scripts/functions/federated/quantile/FederatedQuantileTestReference.dml
@@ -17,13 +17,8 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+#-------------------------------------------------------------
 
-# Returns Euclidian distance matrix (distances between N n-dimensional points)
-
-m_dist = function(Matrix[Double] X) return (Matrix[Double] Y) {
-    G = X %*% t(X);
-    I = matrix(1, rows = nrow(G), cols = ncol(G));
-    Y = -2 * (G) + t(I %*% diag(diag(G))) + t(diag(diag(G)) %*% I);
-    Y = sqrt(Y);
-    Y = replace(target = Y, pattern=0/0, replacement = 0);
-}
\ No newline at end of file
+A = read($1);
+s = quantile (A, $3);
+write(s, $2);
diff --git a/scripts/builtin/dist.dml b/src/test/scripts/functions/federated/quantile/FederatedQuantileWeightsTest.dml
similarity index 70%
copy from scripts/builtin/dist.dml
copy to src/test/scripts/functions/federated/quantile/FederatedQuantileWeightsTest.dml
index 9c473d8..c423a65 100644
--- a/scripts/builtin/dist.dml
+++ b/src/test/scripts/functions/federated/quantile/FederatedQuantileWeightsTest.dml
@@ -17,13 +17,9 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+#-------------------------------------------------------------
 
-# Returns Euclidian distance matrix (distances between N n-dimensional points)
-
-m_dist = function(Matrix[Double] X) return (Matrix[Double] Y) {
-    G = X %*% t(X);
-    I = matrix(1, rows = nrow(G), cols = ncol(G));
-    Y = -2 * (G) + t(I %*% diag(diag(G))) + t(diag(diag(G)) %*% I);
-    Y = sqrt(Y);
-    Y = replace(target = Y, pattern=0/0, replacement = 0);
-}
\ No newline at end of file
+A = federated(addresses=list($in_X1), ranges=list(list(0, 0), list($rows, $cols)));
+W = read($W);
+s = quantile(A, W, $p);
+write(s, $out_S);
diff --git a/scripts/builtin/dist.dml b/src/test/scripts/functions/federated/quantile/FederatedQuantileWeightsTestReference.dml
similarity index 70%
copy from scripts/builtin/dist.dml
copy to src/test/scripts/functions/federated/quantile/FederatedQuantileWeightsTestReference.dml
index 9c473d8..6796757 100644
--- a/scripts/builtin/dist.dml
+++ b/src/test/scripts/functions/federated/quantile/FederatedQuantileWeightsTestReference.dml
@@ -17,13 +17,9 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+#-------------------------------------------------------------
 
-# Returns Euclidian distance matrix (distances between N n-dimensional points)
-
-m_dist = function(Matrix[Double] X) return (Matrix[Double] Y) {
-    G = X %*% t(X);
-    I = matrix(1, rows = nrow(G), cols = ncol(G));
-    Y = -2 * (G) + t(I %*% diag(diag(G))) + t(diag(diag(G)) %*% I);
-    Y = sqrt(Y);
-    Y = replace(target = Y, pattern=0/0, replacement = 0);
-}
\ No newline at end of file
+A = read($1);
+W = read($4);
+s = quantile (A, W, $3);
+write(s, $2);
diff --git a/scripts/builtin/dist.dml b/src/test/scripts/functions/federated/quantile/FederatedQuantilesTest.dml
similarity index 70%
copy from scripts/builtin/dist.dml
copy to src/test/scripts/functions/federated/quantile/FederatedQuantilesTest.dml
index 9c473d8..f5f22fa 100644
--- a/scripts/builtin/dist.dml
+++ b/src/test/scripts/functions/federated/quantile/FederatedQuantilesTest.dml
@@ -17,13 +17,11 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+#-------------------------------------------------------------
 
-# Returns Euclidian distance matrix (distances between N n-dimensional points)
-
-m_dist = function(Matrix[Double] X) return (Matrix[Double] Y) {
-    G = X %*% t(X);
-    I = matrix(1, rows = nrow(G), cols = ncol(G));
-    Y = -2 * (G) + t(I %*% diag(diag(G))) + t(diag(diag(G)) %*% I);
-    Y = sqrt(Y);
-    Y = replace(target = Y, pattern=0/0, replacement = 0);
-}
\ No newline at end of file
+A = federated(addresses=list($in_X1), ranges=list(list(0, 0), list($rows, $cols)));
+P = matrix(0.25, 3, 1);
+P[2,1] = 0.5;
+P[3,1] = 0.75;
+s = quantile(X=A, P=P);
+write(s, $out_S);
diff --git a/scripts/builtin/dist.dml b/src/test/scripts/functions/federated/quantile/FederatedQuantilesTestReference.dml
similarity index 70%
copy from scripts/builtin/dist.dml
copy to src/test/scripts/functions/federated/quantile/FederatedQuantilesTestReference.dml
index 9c473d8..96970d0 100644
--- a/scripts/builtin/dist.dml
+++ b/src/test/scripts/functions/federated/quantile/FederatedQuantilesTestReference.dml
@@ -17,13 +17,11 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+#-------------------------------------------------------------
 
-# Returns Euclidian distance matrix (distances between N n-dimensional points)
-
-m_dist = function(Matrix[Double] X) return (Matrix[Double] Y) {
-    G = X %*% t(X);
-    I = matrix(1, rows = nrow(G), cols = ncol(G));
-    Y = -2 * (G) + t(I %*% diag(diag(G))) + t(diag(diag(G)) %*% I);
-    Y = sqrt(Y);
-    Y = replace(target = Y, pattern=0/0, replacement = 0);
-}
\ No newline at end of file
+A = read($1);
+P = matrix(0.25, 3, 1);
+P[2,1] = 0.5;
+P[3,1] = 0.75;
+s = quantile(X=A, P=P);
+write(s, $2);
diff --git a/scripts/builtin/dist.dml b/src/test/scripts/functions/federated/quantile/FederatedQuantilesWeightsTest.dml
similarity index 70%
copy from scripts/builtin/dist.dml
copy to src/test/scripts/functions/federated/quantile/FederatedQuantilesWeightsTest.dml
index 9c473d8..86f9611 100644
--- a/scripts/builtin/dist.dml
+++ b/src/test/scripts/functions/federated/quantile/FederatedQuantilesWeightsTest.dml
@@ -17,13 +17,12 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+#-------------------------------------------------------------
 
-# Returns Euclidian distance matrix (distances between N n-dimensional points)
-
-m_dist = function(Matrix[Double] X) return (Matrix[Double] Y) {
-    G = X %*% t(X);
-    I = matrix(1, rows = nrow(G), cols = ncol(G));
-    Y = -2 * (G) + t(I %*% diag(diag(G))) + t(diag(diag(G)) %*% I);
-    Y = sqrt(Y);
-    Y = replace(target = Y, pattern=0/0, replacement = 0);
-}
\ No newline at end of file
+A = federated(addresses=list($in_X1), ranges=list(list(0, 0), list($rows, $cols)));
+P = matrix(0.25, 3, 1);
+P[2,1] = 0.5;
+P[3,1] = 0.75;
+W = read($W);
+s = quantile(X=A, W=W, P=P);
+write(s, $out_S);
diff --git a/scripts/builtin/dist.dml b/src/test/scripts/functions/federated/quantile/FederatedQuantilesWeightsTestReference.dml
similarity index 70%
copy from scripts/builtin/dist.dml
copy to src/test/scripts/functions/federated/quantile/FederatedQuantilesWeightsTestReference.dml
index 9c473d8..3c6d7fd 100644
--- a/scripts/builtin/dist.dml
+++ b/src/test/scripts/functions/federated/quantile/FederatedQuantilesWeightsTestReference.dml
@@ -17,13 +17,12 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+#-------------------------------------------------------------
 
-# Returns Euclidian distance matrix (distances between N n-dimensional points)
-
-m_dist = function(Matrix[Double] X) return (Matrix[Double] Y) {
-    G = X %*% t(X);
-    I = matrix(1, rows = nrow(G), cols = ncol(G));
-    Y = -2 * (G) + t(I %*% diag(diag(G))) + t(diag(diag(G)) %*% I);
-    Y = sqrt(Y);
-    Y = replace(target = Y, pattern=0/0, replacement = 0);
-}
\ No newline at end of file
+A = read($1);
+P = matrix(0.25, 3, 1);
+P[2,1] = 0.5;
+P[3,1] = 0.75;
+W = read($W);
+s = quantile(X=A, W=W, P=P);
+write(s, $2);


[systemds] 02/03: [MINOR] Modifications to Federated Tests

Posted by ba...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit 809d53f6236232cb31eb02668263aab2ac80116d
Author: Olga <ov...@gmail.com>
AuthorDate: Fri Nov 13 03:01:11 2020 +0100

    [MINOR] Modifications to Federated Tests
    
    This commit change the federated tests to execute more consistantly on
    github.
---
 scripts/builtin/dist.dml                           |  1 -
 .../controlprogram/federated/FederationUtils.java  | 42 ++++------------------
 .../fed/AggregateUnaryFEDInstruction.java          |  2 +-
 .../instructions/fed/FEDInstructionUtils.java      | 17 ++++-----
 .../federated/algorithms/FederatedCorTest.java     |  7 ++--
 .../federated/algorithms/FederatedVarTest.java     |  8 ++---
 .../primitives/FederatedFullAggregateTest.java     | 31 ++++++++++++----
 .../primitives/FederatedRowColAggregateTest.java   | 37 ++++++++++++++-----
 .../federated/aggregate/FederatedColVarTest.dml    | 20 ++++++-----
 .../aggregate/FederatedColVarTestReference.dml     | 14 +++-----
 .../federated/aggregate/FederatedRowVarTest.dml    | 20 ++++++-----
 .../aggregate/FederatedRowVarTestReference.dml     | 14 +++-----
 .../federated/aggregate/FederatedVarTest.dml       | 21 ++++++-----
 .../aggregate/FederatedVarTestReference.dml        | 14 +++-----
 14 files changed, 124 insertions(+), 124 deletions(-)

diff --git a/scripts/builtin/dist.dml b/scripts/builtin/dist.dml
index e5fe930..1245087 100644
--- a/scripts/builtin/dist.dml
+++ b/scripts/builtin/dist.dml
@@ -24,7 +24,6 @@ m_dist = function(Matrix[Double] X) return (Matrix[Double] Y) {
     G = X %*% t(X);
     I = matrix(1, rows = nrow(G), cols = ncol(G));
     Y = -2 * (G) + (diag(G) * I) + (I * t(diag(G)));
-#   Y = -2 * (G) + t(I %*% diag(diag(G))) + t(diag(diag(G)) %*% I);
     Y = sqrt(Y);
     Y = replace(target = Y, pattern=0/0, replacement = 0);
 }
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
index 0a24cea..22f4e69 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
@@ -172,30 +172,6 @@ public class FederationUtils {
 
 	public static MatrixBlock aggVar(Future<FederatedResponse>[] ffr, Future<FederatedResponse>[] meanFfr, FederationMap map, boolean isRowAggregate, boolean isScalar) {
 		try {
-//			else if(aop.aggOp.increOp.fn instanceof CM) {
-//				double var = ((ScalarObject) ffr[0].get().getData()[0]).getDoubleValue();
-//				double mean = ((ScalarObject) meanFfr[0].get().getData()[0]).getDoubleValue();
-//				long size = map.getFederatedRanges()[0].getSize();
-//				for(int i = 0; i < ffr.length - 1; i++) {
-//					long l = size + map.getFederatedRanges()[i+1].getSize();
-//					double k = ((size * var) + (map.getFederatedRanges()[i+1].getSize() * ((ScalarObject) ffr[i+1].get().getData()[0]).getDoubleValue())) / l;
-//					var = k + (size * map.getFederatedRanges()[i+1].getSize()) * Math.pow((mean - ((ScalarObject) meanFfr[i+1].get().getData()[0]).getDoubleValue()) / l, 2);
-//					mean = (mean *  size + ((ScalarObject) meanFfr[i+1].get().getData()[0]).getDoubleValue() * (map.getFederatedRanges()[i+1].getSize())) / l;
-//					size = l;
-//					System.out.println("Olga");
-//					//					long l = sizes[i] + sizes[i + 1];
-//					//					double k = Math.pow(means[i] - means[i+1], 2) * (sizes[i] * sizes[i+1]);
-//					//					k += ((sizes[i] * vars[i]) + (sizes[i+1] * vars[i+1])) * l;
-//					//					vars[i+1] = k / Math.pow(l, 2);
-//					//
-//					//					means[i+1] = (means[i] * sizes[i] + means[i] * sizes[i]) / l;
-//					//					sizes[i+1] = l;
-//				}
-//				return new DoubleObject(var);
-//
-//			}
-
-
 			FederatedRange[] ranges = map.getFederatedRanges();
 			BinaryOperator plus = InstructionUtils.parseBinaryOperator("+");
 			BinaryOperator minus = InstructionUtils.parseBinaryOperator("-");
@@ -204,13 +180,13 @@ public class FederationUtils {
 			ScalarOperator dev1 = InstructionUtils.parseScalarBinaryOperator("/", false);
 			ScalarOperator pow = InstructionUtils.parseScalarBinaryOperator("^2", false);
 
-			long size1 = isScalar ? ranges[0].getSize() : ranges[0].getSize(isRowAggregate ? 0 : 1);
+			long size1 = isScalar ? ranges[0].getSize() : ranges[0].getSize(isRowAggregate ? 1 : 0);
 			MatrixBlock var1 = (MatrixBlock)ffr[0].get().getData()[0];
 			MatrixBlock mean1 = (MatrixBlock)meanFfr[0].get().getData()[0];
 			for(int i=0; i < ffr.length - 1; i++) {
 				MatrixBlock var2 = (MatrixBlock)ffr[i+1].get().getData()[0];
 				MatrixBlock mean2 = (MatrixBlock)meanFfr[i+1].get().getData()[0];
-				long size2 = isScalar ? ranges[i+1].getSize() : ranges[i+1].getSize(isRowAggregate ? 0 : 1);
+				long size2 = isScalar ? ranges[i+1].getSize() : ranges[i+1].getSize(isRowAggregate ? 1 : 0);
 
 				mult1 = mult1.setConstant(size1);
 				var1 = var1.scalarOperations(mult1, new MatrixBlock());
@@ -219,11 +195,12 @@ public class FederationUtils {
 				dev1 = dev1.setConstant(size1 + size2);
 				var1 = var1.scalarOperations(dev1, new MatrixBlock());
 
-				MatrixBlock tmp1 = (mean1.binaryOperationsInPlace(minus, mean2)).scalarOperations(dev1, new MatrixBlock());
+				MatrixBlock tmp1 = new MatrixBlock(mean1);
+				tmp1 = tmp1.binaryOperationsInPlace(minus, mean2);
+				tmp1 = tmp1.scalarOperations(dev1, new MatrixBlock());
 				tmp1 = tmp1.scalarOperations(pow, new MatrixBlock());
 				mult1 = mult1.setConstant(size1*size2);
 				tmp1 = tmp1.scalarOperations(mult1, new MatrixBlock());
-
 				var1 = tmp1.binaryOperationsInPlace(plus, var1);
 
 				// next mean
@@ -272,13 +249,6 @@ public class FederationUtils {
 					var = k + (size * map.getFederatedRanges()[i+1].getSize()) * Math.pow((mean - ((ScalarObject) meanFfr[i+1].get().getData()[0]).getDoubleValue()) / l, 2);
 					mean = (mean *  size + ((ScalarObject) meanFfr[i+1].get().getData()[0]).getDoubleValue() * (map.getFederatedRanges()[i+1].getSize())) / l;
 					size = l;
-//					long l = sizes[i] + sizes[i + 1];
-//					double k = Math.pow(means[i] - means[i+1], 2) * (sizes[i] * sizes[i+1]);
-//					k += ((sizes[i] * vars[i]) + (sizes[i+1] * vars[i+1])) * l;
-//					vars[i+1] = k / Math.pow(l, 2);
-//
-//					means[i+1] = (means[i] * sizes[i] + means[i] * sizes[i]) / l;
-//					sizes[i+1] = l;
 				}
 				return new DoubleObject(var);
 
@@ -311,7 +281,7 @@ public class FederationUtils {
 			boolean isMin = ((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN;
 			return aggMinMax(ffr,isMin,false, Optional.of(map.getType()));
 		} else if(aop.aggOp.increOp.fn instanceof CM) {
-			return aggVar(ffr, meanFfr, map, aop.isRowAggregate(), !(aop.isColAggregate() && aop.isRowAggregate())); //TODO
+			return aggVar(ffr, meanFfr, map, aop.isRowAggregate(), !(aop.isColAggregate() || aop.isRowAggregate())); //TODO
 		}
 		else
 			throw new DMLRuntimeException("Unsupported aggregation operator: "
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateUnaryFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateUnaryFEDInstruction.java
index 1429dd3..b9f220b 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateUnaryFEDInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateUnaryFEDInstruction.java
@@ -119,6 +119,6 @@ public class AggregateUnaryFEDInstruction extends UnaryFEDInstruction {
 		if( output.isScalar() )
 			ec.setVariable(output.getName(), FederationUtils.aggScalar(aop, tmp, meanTmp, map));
 		else
-			ec.setMatrixOutput(output.getName(), FederationUtils.aggMatrix(aop, meanTmp, tmp, map));
+			ec.setMatrixOutput(output.getName(), FederationUtils.aggMatrix(aop, tmp, meanTmp, map));
 	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
index 1b095e1..f4b19bf 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
@@ -99,23 +99,20 @@ public class FEDInstructionUtils {
 				if((mo instanceof MatrixObject || mo instanceof FrameObject) && mo.isFederated() )
 					fedinst = ReorgFEDInstruction.parseInstruction(rinst.getInstructionString());
 			}
-			else if(instruction.input1 != null && instruction.input1.isMatrix() 
-				&& ec.getMatrixObject(instruction.input1).isFederated()
+			else if(instruction.input1 != null && instruction.input1.isMatrix()
 				&& ec.containsVariable(instruction.input1)) {
 				
 				MatrixObject mo1 = ec.getMatrixObject(instruction.input1);
 				
-				if(instruction.getOpcode().equalsIgnoreCase("cm")) {
+				if(instruction.getOpcode().equalsIgnoreCase("cm") && mo1.isFederated()) {
 					fedinst = CentralMomentFEDInstruction.parseInstruction(inst.getInstructionString());
-				}
-				else if(inst instanceof AggregateUnaryCPInstruction  &&
+				} else if(inst.getOpcode().equalsIgnoreCase("qsort") && mo1.isFederated()) {
+					if(mo1.getFedMapping().getFederatedRanges().length == 1)
+						fedinst = QuantileSortFEDInstruction.parseInstruction(inst.getInstructionString());
+				} else if(inst instanceof AggregateUnaryCPInstruction  && mo1.isFederated() &&
 					((AggregateUnaryCPInstruction) instruction).getAUType() == AggregateUnaryCPInstruction.AUType.DEFAULT) {
 					fedinst = AggregateUnaryFEDInstruction.parseInstruction(inst.getInstructionString());
 				}
-				else if(inst.getOpcode().equalsIgnoreCase("qsort") &&
-					mo1.getFedMapping().getFederatedRanges().length == 1) {
-					fedinst = QuantileSortFEDInstruction.parseInstruction(inst.getInstructionString());
-				}
 			}
 		}
 		else if (inst instanceof BinaryCPInstruction) {
@@ -154,7 +151,7 @@ public class FEDInstructionUtils {
 			// matrix indexing
 			LOG.info("Federated Indexing");
 			MatrixIndexingCPInstruction minst = (MatrixIndexingCPInstruction) inst;
-			if(inst.getOpcode().equalsIgnoreCase("rightIndex") 
+			if(inst.getOpcode().equalsIgnoreCase("rightIndex")
 				&& minst.input1.isMatrix() && ec.getCacheableData(minst.input1).isFederated()) {
 				LOG.info("Federated Right Indexing");
 				fedinst = MatrixIndexingFEDInstruction.parseInstruction(minst.getInstructionString());
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedCorTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedCorTest.java
index 1b06279..15383b2 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedCorTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedCorTest.java
@@ -53,7 +53,7 @@ public class FederatedCorTest extends AutomatedTestBase {
 
 	@Parameterized.Parameters
 	public static Collection<Object[]> data() {
-		return Arrays.asList(new Object[][] {{1600, 8, true}});
+		return Arrays.asList(new Object[][] {{1600, 40, true}});
 	}
 
 	@Override
@@ -133,9 +133,10 @@ public class FederatedCorTest extends AutomatedTestBase {
 		runTest(true, false, null, -1);
 
 		// compare via files
-		compareResults(1e-9);
+		compareResults(1e-2);
 
-		// Assert.assertTrue(heavyHittersContainsString("k+"));
+		Assert.assertTrue(heavyHittersContainsString("fed_uacvar"));
+		Assert.assertTrue(heavyHittersContainsString("fed_tsmm"));
 
 		// check that federated input files are still existing
 		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedVarTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedVarTest.java
index 9579fef..280f0d3 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedVarTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedVarTest.java
@@ -58,12 +58,8 @@ public class FederatedVarTest extends AutomatedTestBase {
 	@Parameterized.Parameters
 	public static Collection<Object[]> data() {
 		return Arrays.asList(new Object[][] {
-			// {10, 1000, false},
-			{100, 4, false},
-			// {36, 1000, true},
-			{1000, 10, true},
-			// {4, 100, true}
-			// {1600, 8, false},
+			{1000, 40, false},
+			{1000, 400, true}
 		});
 	}
 
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedFullAggregateTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedFullAggregateTest.java
index 1617ab6..ec7bda6 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedFullAggregateTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedFullAggregateTest.java
@@ -43,6 +43,7 @@ public class FederatedFullAggregateTest extends AutomatedTestBase {
 	private final static String TEST_NAME2 = "FederatedMeanTest";
 	private final static String TEST_NAME3 = "FederatedMaxTest";
 	private final static String TEST_NAME4 = "FederatedMinTest";
+	private final static String TEST_NAME5 = "FederatedVarTest";
 
 	private final static String TEST_DIR = "functions/federated/aggregate/";
 	private static final String TEST_CLASS_DIR = TEST_DIR + FederatedFullAggregateTest.class.getSimpleName() + "/";
@@ -68,7 +69,7 @@ public class FederatedFullAggregateTest extends AutomatedTestBase {
 	}
 
 	private enum OpType {
-		SUM, MEAN, MAX, MIN
+		SUM, MEAN, MAX, MIN, VAR
 	}
 
 	@Override
@@ -78,6 +79,7 @@ public class FederatedFullAggregateTest extends AutomatedTestBase {
 		addTestConfiguration(TEST_NAME2, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] {"S.scalar"}));
 		addTestConfiguration(TEST_NAME3, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME3, new String[] {"S.scalar"}));
 		addTestConfiguration(TEST_NAME4, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME4, new String[] {"S.scalar"}));
+		addTestConfiguration(TEST_NAME5, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME5, new String[] {"S.scalar"}));
 	}
 
 	@Test
@@ -101,7 +103,11 @@ public class FederatedFullAggregateTest extends AutomatedTestBase {
 	}
 
 	@Test
-	@Ignore
+	public void testVarDenseMatrixCP() {
+		runColAggregateOperationTest(OpType.VAR, ExecType.CP);
+	}
+
+	@Test
 	public void testSumDenseMatrixSP() {
 		runColAggregateOperationTest(OpType.SUM, ExecType.SPARK);
 	}
@@ -124,6 +130,11 @@ public class FederatedFullAggregateTest extends AutomatedTestBase {
 		runColAggregateOperationTest(OpType.MIN, ExecType.SPARK);
 	}
 
+	@Test
+	public void testVarDenseMatrixSP() {
+		runColAggregateOperationTest(OpType.VAR, ExecType.SPARK);
+	}
+
 	private void runColAggregateOperationTest(OpType type, ExecType instType) {
 		ExecMode platformOld = rtplatform;
 		switch(instType) {
@@ -152,6 +163,9 @@ public class FederatedFullAggregateTest extends AutomatedTestBase {
 			case MIN:
 				TEST_NAME = TEST_NAME4;
 				break;
+			case VAR:
+				TEST_NAME = TEST_NAME5;
+				break;
 		}
 
 		getAndLoadTestConfiguration(TEST_NAME);
@@ -165,10 +179,10 @@ public class FederatedFullAggregateTest extends AutomatedTestBase {
 			c = cols;
 		}
 
-		double[][] X1 = getRandomMatrix(r, c, 1, 5, 1, 3);
-		double[][] X2 = getRandomMatrix(r, c, 1, 5, 1, 7);
-		double[][] X3 = getRandomMatrix(r, c, 1, 5, 1, 8);
-		double[][] X4 = getRandomMatrix(r, c, 1, 5, 1, 9);
+		double[][] X1 = getRandomMatrix(r, c, 1, 3, 1, 3);
+		double[][] X2 = getRandomMatrix(r, c, 1, 3, 1, 7);
+		double[][] X3 = getRandomMatrix(r, c, 1, 3, 1, 8);
+		double[][] X4 = getRandomMatrix(r, c, 1, 3, 1, 9);
 
 		MatrixCharacteristics mc = new MatrixCharacteristics(r, c, blocksize, r * c);
 		writeInputMatrixWithMTD("X1", X1, false, mc);
@@ -209,7 +223,7 @@ public class FederatedFullAggregateTest extends AutomatedTestBase {
 		runTest(true, false, null, -1);
 
 		// compare via files
-		compareResults(1e-9);
+		compareResults(type == OpType.VAR ? 1e-2 : 1e-9);
 
 		switch(type) {
 			case SUM:
@@ -224,6 +238,9 @@ public class FederatedFullAggregateTest extends AutomatedTestBase {
 			case MIN:
 				Assert.assertTrue(heavyHittersContainsString("fed_uamin"));
 				break;
+			case VAR:
+				Assert.assertTrue(heavyHittersContainsString("fed_uavar"));
+				break;
 		}
 
 		// check that federated input files are still existing
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRowColAggregateTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRowColAggregateTest.java
index 7bfbeee..31800af 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRowColAggregateTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRowColAggregateTest.java
@@ -45,6 +45,8 @@ public class FederatedRowColAggregateTest extends AutomatedTestBase {
 	private final static String TEST_NAME6 = "FederatedRowMeanTest";
 	private final static String TEST_NAME7 = "FederatedRowMaxTest";
 	private final static String TEST_NAME8 = "FederatedRowMinTest";
+	private final static String TEST_NAME9 = "FederatedRowVarTest";
+	private final static String TEST_NAME10 = "FederatedColVarTest";
 
 	private final static String TEST_DIR = "functions/federated/aggregate/";
 	private static final String TEST_CLASS_DIR = TEST_DIR + FederatedRowColAggregateTest.class.getSimpleName() + "/";
@@ -60,13 +62,14 @@ public class FederatedRowColAggregateTest extends AutomatedTestBase {
 	@Parameterized.Parameters
 	public static Collection<Object[]> data() {
 		return Arrays.asList(
-			new Object[][] {{10, 1000, false},
-			//{100, 4, false}, {36, 1000, true}, {1000, 10, true}, {4, 100, true}
+			new Object[][] {
+				{10, 1000, false},
+				{1000, 40, true},
 		});
 	}
 
 	private enum OpType {
-		SUM, MEAN, MAX, MIN
+		SUM, MEAN, MAX, MIN, VAR
 	}
 
 	private enum InstType {
@@ -84,6 +87,8 @@ public class FederatedRowColAggregateTest extends AutomatedTestBase {
 		addTestConfiguration(TEST_NAME6, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME6, new String[] {"S"}));
 		addTestConfiguration(TEST_NAME7, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME7, new String[] {"S"}));
 		addTestConfiguration(TEST_NAME8, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME8, new String[] {"S"}));
+		addTestConfiguration(TEST_NAME9, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME9, new String[] {"S"}));
+		addTestConfiguration(TEST_NAME10, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME10, new String[] {"S"}));
 	}
 
 	@Test
@@ -126,6 +131,16 @@ public class FederatedRowColAggregateTest extends AutomatedTestBase {
 		runAggregateOperationTest(OpType.MIN, InstType.COL, ExecMode.SINGLE_NODE);
 	}
 
+	@Test
+	public void testRowVarDenseMatrixCP() {
+		runAggregateOperationTest(OpType.VAR, InstType.ROW, ExecMode.SINGLE_NODE);
+	}
+
+	@Test
+	public void testColVarDenseMatrixCP() {
+		runAggregateOperationTest(OpType.VAR, InstType.COL, ExecMode.SINGLE_NODE);
+	}
+
 	private void runAggregateOperationTest(OpType type, InstType instr, ExecMode execMode) {
 		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
 		ExecMode platformOld = rtplatform;
@@ -147,6 +162,9 @@ public class FederatedRowColAggregateTest extends AutomatedTestBase {
 			case MIN:
 				TEST_NAME = instr == InstType.COL ? TEST_NAME4 : TEST_NAME8;
 				break;
+			case VAR:
+				TEST_NAME = instr == InstType.COL ? TEST_NAME10 : TEST_NAME9;
+				break;
 		}
 
 		getAndLoadTestConfiguration(TEST_NAME);
@@ -160,10 +178,10 @@ public class FederatedRowColAggregateTest extends AutomatedTestBase {
 			c = cols;
 		}
 
-		double[][] X1 = getRandomMatrix(r, c, 1, 5, 1, 3);
-		double[][] X2 = getRandomMatrix(r, c, 1, 5, 1, 7);
-		double[][] X3 = getRandomMatrix(r, c, 1, 5, 1, 8);
-		double[][] X4 = getRandomMatrix(r, c, 1, 5, 1, 9);
+		double[][] X1 = getRandomMatrix(r, c, 1, 3, 1, 3);
+		double[][] X2 = getRandomMatrix(r, c, 1, 3, 1, 7);
+		double[][] X3 = getRandomMatrix(r, c, 1, 3, 1, 8);
+		double[][] X4 = getRandomMatrix(r, c, 1, 3, 1, 9);
 
 		MatrixCharacteristics mc = new MatrixCharacteristics(r, c, blocksize, r * c);
 		writeInputMatrixWithMTD("X1", X1, false, mc);
@@ -209,7 +227,7 @@ public class FederatedRowColAggregateTest extends AutomatedTestBase {
 		runTest(true, false, null, -1);
 
 		// compare via files
-		compareResults(1e-9);
+		compareResults(type == FederatedRowColAggregateTest.OpType.VAR ? 1e-2 : 1e-9);
 
 		String fedInst = instr == InstType.COL ? "fed_uac" : "fed_uar";
 
@@ -226,6 +244,9 @@ public class FederatedRowColAggregateTest extends AutomatedTestBase {
 			case MIN:
 				Assert.assertTrue(heavyHittersContainsString(fedInst.concat("min")));
 				break;
+			case VAR:
+				Assert.assertTrue(heavyHittersContainsString(fedInst.concat("var")));
+				break;
 		}
 
 		// check that federated input files are still existing
diff --git a/scripts/builtin/dist.dml b/src/test/scripts/functions/federated/aggregate/FederatedColVarTest.dml
similarity index 57%
copy from scripts/builtin/dist.dml
copy to src/test/scripts/functions/federated/aggregate/FederatedColVarTest.dml
index e5fe930..186dc1d 100644
--- a/scripts/builtin/dist.dml
+++ b/src/test/scripts/functions/federated/aggregate/FederatedColVarTest.dml
@@ -17,14 +17,18 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+#-------------------------------------------------------------
 
-# Returns Euclidian distance matrix (distances between N n-dimensional points)
 
-m_dist = function(Matrix[Double] X) return (Matrix[Double] Y) {
-    G = X %*% t(X);
-    I = matrix(1, rows = nrow(G), cols = ncol(G));
-    Y = -2 * (G) + (diag(G) * I) + (I * t(diag(G)));
-#   Y = -2 * (G) + t(I %*% diag(diag(G))) + t(diag(diag(G)) %*% I);
-    Y = sqrt(Y);
-    Y = replace(target = Y, pattern=0/0, replacement = 0);
+if ($rP) {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+        ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+    		list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+} else {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+            ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2),
+            	list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
 }
+
+s = colVars(A);
+write(s, $out_S);
\ No newline at end of file
diff --git a/scripts/builtin/dist.dml b/src/test/scripts/functions/federated/aggregate/FederatedColVarTestReference.dml
similarity index 67%
copy from scripts/builtin/dist.dml
copy to src/test/scripts/functions/federated/aggregate/FederatedColVarTestReference.dml
index e5fe930..ec9b021 100644
--- a/scripts/builtin/dist.dml
+++ b/src/test/scripts/functions/federated/aggregate/FederatedColVarTestReference.dml
@@ -17,14 +17,10 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+#-------------------------------------------------------------
 
-# Returns Euclidian distance matrix (distances between N n-dimensional points)
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
 
-m_dist = function(Matrix[Double] X) return (Matrix[Double] Y) {
-    G = X %*% t(X);
-    I = matrix(1, rows = nrow(G), cols = ncol(G));
-    Y = -2 * (G) + (diag(G) * I) + (I * t(diag(G)));
-#   Y = -2 * (G) + t(I %*% diag(diag(G))) + t(diag(diag(G)) %*% I);
-    Y = sqrt(Y);
-    Y = replace(target = Y, pattern=0/0, replacement = 0);
-}
+s = colVars(A);
+write(s, $5);
diff --git a/scripts/builtin/dist.dml b/src/test/scripts/functions/federated/aggregate/FederatedRowVarTest.dml
similarity index 57%
copy from scripts/builtin/dist.dml
copy to src/test/scripts/functions/federated/aggregate/FederatedRowVarTest.dml
index e5fe930..8b4a57d 100644
--- a/scripts/builtin/dist.dml
+++ b/src/test/scripts/functions/federated/aggregate/FederatedRowVarTest.dml
@@ -17,14 +17,18 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+#-------------------------------------------------------------
 
-# Returns Euclidian distance matrix (distances between N n-dimensional points)
 
-m_dist = function(Matrix[Double] X) return (Matrix[Double] Y) {
-    G = X %*% t(X);
-    I = matrix(1, rows = nrow(G), cols = ncol(G));
-    Y = -2 * (G) + (diag(G) * I) + (I * t(diag(G)));
-#   Y = -2 * (G) + t(I %*% diag(diag(G))) + t(diag(diag(G)) %*% I);
-    Y = sqrt(Y);
-    Y = replace(target = Y, pattern=0/0, replacement = 0);
+if ($rP) {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+        ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+    		list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+} else {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+            ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2),
+            	list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
 }
+
+s = rowVars(A);
+write(s, $out_S);
\ No newline at end of file
diff --git a/scripts/builtin/dist.dml b/src/test/scripts/functions/federated/aggregate/FederatedRowVarTestReference.dml
similarity index 67%
copy from scripts/builtin/dist.dml
copy to src/test/scripts/functions/federated/aggregate/FederatedRowVarTestReference.dml
index e5fe930..e983899 100644
--- a/scripts/builtin/dist.dml
+++ b/src/test/scripts/functions/federated/aggregate/FederatedRowVarTestReference.dml
@@ -17,14 +17,10 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+#-------------------------------------------------------------
 
-# Returns Euclidian distance matrix (distances between N n-dimensional points)
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
 
-m_dist = function(Matrix[Double] X) return (Matrix[Double] Y) {
-    G = X %*% t(X);
-    I = matrix(1, rows = nrow(G), cols = ncol(G));
-    Y = -2 * (G) + (diag(G) * I) + (I * t(diag(G)));
-#   Y = -2 * (G) + t(I %*% diag(diag(G))) + t(diag(diag(G)) %*% I);
-    Y = sqrt(Y);
-    Y = replace(target = Y, pattern=0/0, replacement = 0);
-}
+s = rowVars(A);
+write(s, $5);
diff --git a/scripts/builtin/dist.dml b/src/test/scripts/functions/federated/aggregate/FederatedVarTest.dml
similarity index 57%
copy from scripts/builtin/dist.dml
copy to src/test/scripts/functions/federated/aggregate/FederatedVarTest.dml
index e5fe930..88fd6a8 100644
--- a/scripts/builtin/dist.dml
+++ b/src/test/scripts/functions/federated/aggregate/FederatedVarTest.dml
@@ -17,14 +17,17 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+#-------------------------------------------------------------
 
-# Returns Euclidian distance matrix (distances between N n-dimensional points)
-
-m_dist = function(Matrix[Double] X) return (Matrix[Double] Y) {
-    G = X %*% t(X);
-    I = matrix(1, rows = nrow(G), cols = ncol(G));
-    Y = -2 * (G) + (diag(G) * I) + (I * t(diag(G)));
-#   Y = -2 * (G) + t(I %*% diag(diag(G))) + t(diag(diag(G)) %*% I);
-    Y = sqrt(Y);
-    Y = replace(target = Y, pattern=0/0, replacement = 0);
+if ($rP) {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+        ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+    		list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+} else {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+            ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2),
+            	list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
 }
+
+s = var(A);
+write(s, $out_S);
diff --git a/scripts/builtin/dist.dml b/src/test/scripts/functions/federated/aggregate/FederatedVarTestReference.dml
similarity index 67%
copy from scripts/builtin/dist.dml
copy to src/test/scripts/functions/federated/aggregate/FederatedVarTestReference.dml
index e5fe930..af98e13 100644
--- a/scripts/builtin/dist.dml
+++ b/src/test/scripts/functions/federated/aggregate/FederatedVarTestReference.dml
@@ -17,14 +17,10 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+#-------------------------------------------------------------
 
-# Returns Euclidian distance matrix (distances between N n-dimensional points)
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
 
-m_dist = function(Matrix[Double] X) return (Matrix[Double] Y) {
-    G = X %*% t(X);
-    I = matrix(1, rows = nrow(G), cols = ncol(G));
-    Y = -2 * (G) + (diag(G) * I) + (I * t(diag(G)));
-#   Y = -2 * (G) + t(I %*% diag(diag(G))) + t(diag(diag(G)) %*% I);
-    Y = sqrt(Y);
-    Y = replace(target = Y, pattern=0/0, replacement = 0);
-}
+s = var(A);
+write(s, $5);


[systemds] 03/03: [MINOR] Federated Modifications

Posted by ba...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit 914b8f8966879c274ca30130a24d502c08f59b6c
Author: baunsgaard <ba...@tugraz.at>
AuthorDate: Sat Nov 14 10:29:08 2020 +0100

    [MINOR] Federated Modifications
    
    Major reduction in federated tests, by redusing startup time of
    federated tests with multiple workers.
    
    Furthermore a timeout is added to funtions tests allowing only 60 minutes
    of execution time before being forcefully terminated.
    This reduce the waiting time for feedback of tests that anyway would
    timeout after 6 hours.
    
    Isolate Function Test in workflows,
    and stabilize negative federated test,
    and reduce Federated Kmeans Tests
    
    Privacy monitor added a null pointer check that happens if the object on
    the federated site becomes null. This error would result in stack traces
    that were hard to debug.
    
    Fix :bug: in federated right indexing if the indexing aligns to a split
    between locations.
---
 .github/workflows/functionsTests.yml               |  4 +-
 .../fed/MatrixIndexingFEDInstruction.java          |  4 +-
 .../sysds/runtime/privacy/PrivacyMonitor.java      |  2 +
 .../org/apache/sysds/test/AutomatedTestBase.java   | 15 ++++-
 .../federated/algorithms/FederatedBivarTest.java   |  6 +-
 .../federated/algorithms/FederatedCorTest.java     |  6 +-
 .../federated/algorithms/FederatedGLMTest.java     |  2 +-
 .../federated/algorithms/FederatedKmeansTest.java  | 22 ++++---
 .../federated/algorithms/FederatedL2SVMTest.java   |  2 +-
 .../federated/algorithms/FederatedLogRegTest.java  |  2 +-
 .../federated/algorithms/FederatedPCATest.java     |  6 +-
 .../federated/algorithms/FederatedUnivarTest.java  |  6 +-
 .../federated/algorithms/FederatedVarTest.java     |  6 +-
 .../federated/algorithms/FederatedYL2SVMTest.java  |  2 +-
 .../federated/io/FederatedReaderTest.java          |  2 +-
 .../functions/federated/io/FederatedSSLTest.java   |  2 +-
 .../federated/io/FederatedWriterTest.java          |  2 +-
 .../paramserv/FederatedParamservTest.java          | 63 ++++++++++--------
 .../primitives/FederatedBinaryMatrixTest.java      |  2 +-
 .../primitives/FederatedBinaryVectorTest.java      |  2 +-
 .../primitives/FederatedCastToFrameTest.java       |  2 +-
 .../primitives/FederatedCastToMatrixTest.java      |  2 +-
 .../primitives/FederatedCentralMomentTest.java     |  8 +--
 ...ateTest.java => FederatedColAggregateTest.java} | 74 ++++++---------------
 .../primitives/FederatedFullAggregateTest.java     |  8 ++-
 .../primitives/FederatedMultiplyTest.java          |  2 +-
 .../primitives/FederatedNegativeTest.java          | 28 +++++---
 .../federated/primitives/FederatedRCBindTest.java  |  4 +-
 .../primitives/FederatedRightIndexTest.java        | 41 ++++++------
 ...ateTest.java => FederatedRowAggregateTest.java} | 75 ++++++----------------
 .../federated/primitives/FederatedSplitTest.java   |  4 +-
 .../primitives/FederatedStatisticsTest.java        |  2 +-
 .../TransformFederatedEncodeApplyTest.java         |  6 +-
 .../TransformFederatedEncodeDecodeTest.java        |  6 +-
 34 files changed, 192 insertions(+), 228 deletions(-)

diff --git a/.github/workflows/functionsTests.yml b/.github/workflows/functionsTests.yml
index a094652..c816245 100644
--- a/.github/workflows/functionsTests.yml
+++ b/.github/workflows/functionsTests.yml
@@ -32,13 +32,15 @@ on:
 jobs:
   applicationsTests:
     runs-on: ${{ matrix.os }}
+    timeout-minutes: 60
     strategy:
       fail-fast: false
       matrix:
         tests: [
           "**.functions.aggregate.**,**.functions.append.**,**.functions.binary.frame.**,**.functions.binary.matrix.**,**.functions.binary.scalar.**,**.functions.binary.tensor.**",
           "**.functions.blocks.**,**.functions.compress.**,**.functions.countDistinct.**,**.functions.data.misc.**,**.functions.data.rand.**,**.functions.data.tensor.**,**.functions.codegenalg.parttwo.**,**.functions.codegen.**,**.functions.caching.**",
-          "**.functions.federated.**,**.functions.binary.matrix_full_cellwise.**,**.functions.binary.matrix_full_other.**",
+          "**.functions.binary.matrix_full_cellwise.**,**.functions.binary.matrix_full_other.**",
+          "**.functions.federated.**",
           "**.functions.codegenalg.partone.**",
           "**.functions.builtin.**",
           "**.functions.frame.**,**.functions.indexing.**,**.functions.io.**,**.functions.jmlc.**,**.functions.lineage.**",
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/MatrixIndexingFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/MatrixIndexingFEDInstruction.java
index bc2c066..5c0a821 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/MatrixIndexingFEDInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/MatrixIndexingFEDInstruction.java
@@ -77,9 +77,9 @@ public final class MatrixIndexingFEDInstruction extends IndexingFEDInstruction {
 				curFedRange.setBeginDim(0, Math.max(rs - ixrange.rowStart, 0));
 				curFedRange.setBeginDim(1, Math.max(cs - ixrange.colStart, 0));
 				curFedRange.setEndDim(0,
-					(ixrange.rowEnd > re ? re - ixrange.rowStart : ixrange.rowEnd - ixrange.rowStart + 1));
+					(ixrange.rowEnd >= re ? re - ixrange.rowStart : ixrange.rowEnd - ixrange.rowStart + 1));
 				curFedRange.setEndDim(1,
-					(ixrange.colEnd > ce ? ce - ixrange.colStart : ixrange.colEnd - ixrange.colStart + 1));
+					(ixrange.colEnd >= ce ? ce - ixrange.colStart : ixrange.colEnd - ixrange.colStart + 1));
 				if(LOG.isDebugEnabled()) {
 					LOG.debug("Fed Mapping After      : " + curFedRange);
 				}
diff --git a/src/main/java/org/apache/sysds/runtime/privacy/PrivacyMonitor.java b/src/main/java/org/apache/sysds/runtime/privacy/PrivacyMonitor.java
index 4e286d0..97ac22b 100644
--- a/src/main/java/org/apache/sysds/runtime/privacy/PrivacyMonitor.java
+++ b/src/main/java/org/apache/sysds/runtime/privacy/PrivacyMonitor.java
@@ -65,6 +65,8 @@ public class PrivacyMonitor
 	 * @return data object or data object with privacy constraint removed in case the privacy level was none. 
 	 */
 	public static Data handlePrivacy(Data dataObject){
+		if(dataObject == null)
+			return null;
 		PrivacyConstraint privacyConstraint = dataObject.getPrivacyConstraint();
 		if (privacyConstraint != null){
 			PrivacyLevel privacyLevel = privacyConstraint.getPrivacyLevel();
diff --git a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
index c3a0c59..3c3471e 100644
--- a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
+++ b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
@@ -1421,6 +1421,19 @@ public abstract class AutomatedTestBase {
 	 * @return the thread associated with the worker.
 	 */
 	protected Thread startLocalFedWorkerThread(int port) {
+		return startLocalFedWorkerThread(port, FED_WORKER_WAIT);
+	}
+
+	/**
+	 * Start a thread for a worker. This will share the same JVM, so all static variables will be shared.!
+	 * 
+	 * Also when using the local Fed Worker thread the statistics printing, and clearing from the worker is disabled.
+	 * 
+	 * @param port Port to use
+	 * @param sleep  The amount of time to wait for the worker startup. in Milliseconds
+	 * @return the thread associated with the worker.
+	 */
+	protected Thread startLocalFedWorkerThread(int port, int sleep) {
 		Thread t = null;
 		String[] fedWorkArgs = {"-w", Integer.toString(port)};
 		ArrayList<String> args = new ArrayList<>();
@@ -1443,7 +1456,7 @@ public abstract class AutomatedTestBase {
 				}
 			});
 			t.start();
-			java.util.concurrent.TimeUnit.MILLISECONDS.sleep(FED_WORKER_WAIT);
+			java.util.concurrent.TimeUnit.MILLISECONDS.sleep(sleep);
 		}
 		catch(InterruptedException e) {
 			e.printStackTrace();
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedBivarTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedBivarTest.java
index ced8bca..ff811e0 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedBivarTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedBivarTest.java
@@ -114,9 +114,9 @@ public class FederatedBivarTest extends AutomatedTestBase {
 		int port2 = getRandomAvailablePort();
 		int port3 = getRandomAvailablePort();
 		int port4 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorkerThread(port1);
-		Thread t2 = startLocalFedWorkerThread(port2);
-		Thread t3 = startLocalFedWorkerThread(port3);
+		Thread t1 = startLocalFedWorkerThread(port1, 10);
+		Thread t2 = startLocalFedWorkerThread(port2, 10);
+		Thread t3 = startLocalFedWorkerThread(port3, 10);
 		Thread t4 = startLocalFedWorkerThread(port4);
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedCorTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedCorTest.java
index 15383b2..82437b1 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedCorTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedCorTest.java
@@ -102,9 +102,9 @@ public class FederatedCorTest extends AutomatedTestBase {
 		int port2 = getRandomAvailablePort();
 		int port3 = getRandomAvailablePort();
 		int port4 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorkerThread(port1);
-		Thread t2 = startLocalFedWorkerThread(port2);
-		Thread t3 = startLocalFedWorkerThread(port3);
+		Thread t1 = startLocalFedWorkerThread(port1, 10);
+		Thread t2 = startLocalFedWorkerThread(port2, 10);
+		Thread t3 = startLocalFedWorkerThread(port3, 10);
 		Thread t4 = startLocalFedWorkerThread(port4);
 
 		rtplatform = execMode;
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedGLMTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedGLMTest.java
index 1e608ce..eb8aee8 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedGLMTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedGLMTest.java
@@ -95,7 +95,7 @@ public class FederatedGLMTest extends AutomatedTestBase {
 		fullDMLScriptName = "";
 		int port1 = getRandomAvailablePort();
 		int port2 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorkerThread(port1);
+		Thread t1 = startLocalFedWorkerThread(port1, 10);
 		Thread t2 = startLocalFedWorkerThread(port2);
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedKmeansTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedKmeansTest.java
index 8a33d20..f296b3a 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedKmeansTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedKmeansTest.java
@@ -19,10 +19,8 @@
 
 package org.apache.sysds.test.functions.federated.algorithms;
 
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import java.util.Arrays;
+import java.util.Collection;
 
 import org.apache.sysds.common.Types;
 import org.apache.sysds.common.Types.ExecMode;
@@ -33,9 +31,11 @@ import org.apache.sysds.runtime.util.HDFSTool;
 import org.apache.sysds.test.AutomatedTestBase;
 import org.apache.sysds.test.TestConfiguration;
 import org.apache.sysds.test.TestUtils;
-
-import java.util.Arrays;
-import java.util.Collection;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 @RunWith(value = Parameterized.class)
 @net.jcip.annotations.NotThreadSafe
@@ -64,9 +64,10 @@ public class FederatedKmeansTest extends AutomatedTestBase {
 	@Parameterized.Parameters
 	public static Collection<Object[]> data() {
 		// rows have to be even and > 1
-		return Arrays.asList(new Object[][] {{10000, 10, 1, 1},
+		return Arrays.asList(new Object[][] {
+			// {10000, 10, 1, 1},
 			// {2000, 50, 1, 1}, {1000, 100, 1, 1},
-			{10000, 10, 2, 1},
+			// {10000, 10, 2, 1},
 			// {2000, 50, 2, 1}, {1000, 100, 2, 1}, //concurrent requests
 			{10000, 10, 2, 2}, // repeated exec
 			// TODO more runs e.g., 16 -> but requires rework RPC framework first
@@ -80,6 +81,7 @@ public class FederatedKmeansTest extends AutomatedTestBase {
 	}
 
 	@Test
+	@Ignore
 	public void federatedKmeansHybrid() {
 		federatedKmeans(Types.ExecMode.HYBRID);
 	}
@@ -102,7 +104,7 @@ public class FederatedKmeansTest extends AutomatedTestBase {
 		fullDMLScriptName = "";
 		int port1 = getRandomAvailablePort();
 		int port2 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorkerThread(port1);
+		Thread t1 = startLocalFedWorkerThread(port1, 10);
 		Thread t2 = startLocalFedWorkerThread(port2);
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedL2SVMTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedL2SVMTest.java
index 53bfc8d..f17754e 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedL2SVMTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedL2SVMTest.java
@@ -99,7 +99,7 @@ public class FederatedL2SVMTest extends AutomatedTestBase {
 		fullDMLScriptName = "";
 		int port1 = getRandomAvailablePort();
 		int port2 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorkerThread(port1);
+		Thread t1 = startLocalFedWorkerThread(port1, 10);
 		Thread t2 = startLocalFedWorkerThread(port2);
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLogRegTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLogRegTest.java
index 42c614b..e7f1f80 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLogRegTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLogRegTest.java
@@ -95,7 +95,7 @@ public class FederatedLogRegTest extends AutomatedTestBase {
 		fullDMLScriptName = "";
 		int port1 = getRandomAvailablePort();
 		int port2 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorkerThread(port1);
+		Thread t1 = startLocalFedWorkerThread(port1, 10);
 		Thread t2 = startLocalFedWorkerThread(port2);
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedPCATest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedPCATest.java
index 99c90ee..8438bb6 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedPCATest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedPCATest.java
@@ -102,9 +102,9 @@ public class FederatedPCATest extends AutomatedTestBase {
 		int port2 = getRandomAvailablePort();
 		int port3 = getRandomAvailablePort();
 		int port4 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorkerThread(port1);
-		Thread t2 = startLocalFedWorkerThread(port2);
-		Thread t3 = startLocalFedWorkerThread(port3);
+		Thread t1 = startLocalFedWorkerThread(port1, 10);
+		Thread t2 = startLocalFedWorkerThread(port2, 10);
+		Thread t3 = startLocalFedWorkerThread(port3, 10);
 		Thread t4 = startLocalFedWorkerThread(port4);
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedUnivarTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedUnivarTest.java
index 588796a..7333533 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedUnivarTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedUnivarTest.java
@@ -100,9 +100,9 @@ public class FederatedUnivarTest extends AutomatedTestBase {
 		int port2 = getRandomAvailablePort();
 		int port3 = getRandomAvailablePort();
 		int port4 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorkerThread(port1);
-		Thread t2 = startLocalFedWorkerThread(port2);
-		Thread t3 = startLocalFedWorkerThread(port3);
+		Thread t1 = startLocalFedWorkerThread(port1, 10);
+		Thread t2 = startLocalFedWorkerThread(port2, 10);
+		Thread t3 = startLocalFedWorkerThread(port3, 10);
 		Thread t4 = startLocalFedWorkerThread(port4);
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedVarTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedVarTest.java
index 280f0d3..46af1c9 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedVarTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedVarTest.java
@@ -109,9 +109,9 @@ public class FederatedVarTest extends AutomatedTestBase {
 		int port2 = getRandomAvailablePort();
 		int port3 = getRandomAvailablePort();
 		int port4 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorkerThread(port1);
-		Thread t2 = startLocalFedWorkerThread(port2);
-		Thread t3 = startLocalFedWorkerThread(port3);
+		Thread t1 = startLocalFedWorkerThread(port1, 10);
+		Thread t2 = startLocalFedWorkerThread(port2, 10);
+		Thread t3 = startLocalFedWorkerThread(port3, 10);
 		Thread t4 = startLocalFedWorkerThread(port4);
 
 		rtplatform = execMode;
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedYL2SVMTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedYL2SVMTest.java
index 0657e50..d0eaf87 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedYL2SVMTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedYL2SVMTest.java
@@ -104,7 +104,7 @@ public class FederatedYL2SVMTest extends AutomatedTestBase {
 		fullDMLScriptName = "";
 		int port1 = getRandomAvailablePort();
 		int port2 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorkerThread(port1);
+		Thread t1 = startLocalFedWorkerThread(port1, 10);
 		Thread t2 = startLocalFedWorkerThread(port2);
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
index d4dc464..2587fe9 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
@@ -87,7 +87,7 @@ public class FederatedReaderTest extends AutomatedTestBase {
 		fullDMLScriptName = "";
 		int port1 = getRandomAvailablePort();
 		int port2 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorkerThread(port1);
+		Thread t1 = startLocalFedWorkerThread(port1, 10);
 		Thread t2 = startLocalFedWorkerThread(port2);
 		String host = "localhost";
 
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedSSLTest.java b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedSSLTest.java
index 6ec2f40..d086174 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedSSLTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedSSLTest.java
@@ -93,7 +93,7 @@ public class FederatedSSLTest extends AutomatedTestBase {
 		fullDMLScriptName = "";
 		int port1 = getRandomAvailablePort();
 		int port2 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorkerThread(port1);
+		Thread t1 = startLocalFedWorkerThread(port1, 10);
 		Thread t2 = startLocalFedWorkerThread(port2);
 		String host = "localhost";
 
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedWriterTest.java b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedWriterTest.java
index c8a50fe..a83fad3 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedWriterTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedWriterTest.java
@@ -83,7 +83,7 @@ public class FederatedWriterTest extends AutomatedTestBase {
 		fullDMLScriptName = "";
 		int port1 = getRandomAvailablePort();
 		int port2 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorkerThread(port1);
+		Thread t1 = startLocalFedWorkerThread(port1, 10);
 		Thread t2 = startLocalFedWorkerThread(port2);
 
 		try {
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/paramserv/FederatedParamservTest.java b/src/test/java/org/apache/sysds/test/functions/federated/paramserv/FederatedParamservTest.java
index c4d04ea..3015aaa 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/paramserv/FederatedParamservTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/paramserv/FederatedParamservTest.java
@@ -37,7 +37,6 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-
 @RunWith(value = Parameterized.class)
 @net.jcip.annotations.NotThreadSafe
 public class FederatedParamservTest extends AutomatedTestBase {
@@ -60,15 +59,12 @@ public class FederatedParamservTest extends AutomatedTestBase {
 	@Parameterized.Parameters
 	public static Collection<Object[]> parameters() {
 		return Arrays.asList(new Object[][] {
-			//Network type, number of federated workers, examples per worker, batch size, epochs, learning rate, update type, update frequency
-			{"TwoNN", 2, 2, 1, 5, 0.01, "BSP", "BATCH"},
-			{"TwoNN", 2, 2, 1, 5, 0.01, "ASP", "BATCH"},
-			{"TwoNN", 2, 2, 1, 5, 0.01, "BSP", "EPOCH"},
-			{"TwoNN", 2, 2, 1, 5, 0.01, "ASP", "EPOCH"},
-			{"CNN", 2, 2, 1, 5, 0.01, "BSP", "BATCH"},
-			{"CNN", 2, 2, 1, 5, 0.01, "ASP", "BATCH"},
-			{"CNN", 2, 2, 1, 5, 0.01, "BSP", "EPOCH"},
-			{"CNN", 2, 2, 1, 5, 0.01, "ASP", "EPOCH"},
+			// Network type, number of federated workers, examples per worker, batch size, epochs, learning rate, update
+			// type, update frequency
+			{"TwoNN", 2, 2, 1, 5, 0.01, "BSP", "BATCH"}, {"TwoNN", 2, 2, 1, 5, 0.01, "ASP", "BATCH"},
+			{"TwoNN", 2, 2, 1, 5, 0.01, "BSP", "EPOCH"}, {"TwoNN", 2, 2, 1, 5, 0.01, "ASP", "EPOCH"},
+			{"CNN", 2, 2, 1, 5, 0.01, "BSP", "BATCH"}, {"CNN", 2, 2, 1, 5, 0.01, "ASP", "BATCH"},
+			{"CNN", 2, 2, 1, 5, 0.01, "BSP", "EPOCH"}, {"CNN", 2, 2, 1, 5, 0.01, "ASP", "EPOCH"},
 			{"TwoNN", 5, 1000, 200, 2, 0.01, "BSP", "BATCH"},
 			// {"TwoNN", 5, 1000, 200, 2, 0.01, "ASP", "BATCH"},
 			// {"TwoNN", 5, 1000, 200, 2, 0.01, "BSP", "EPOCH"},
@@ -80,7 +76,8 @@ public class FederatedParamservTest extends AutomatedTestBase {
 		});
 	}
 
-	public FederatedParamservTest(String networkType, int numFederatedWorkers, int examplesPerWorker, int batch_size, int epochs, double eta, String utype, String freq) {
+	public FederatedParamservTest(String networkType, int numFederatedWorkers, int examplesPerWorker, int batch_size,
+		int epochs, double eta, String utype, String freq) {
 		_networkType = networkType;
 		_numFederatedWorkers = numFederatedWorkers;
 		_examplesPerWorker = examplesPerWorker;
@@ -101,12 +98,12 @@ public class FederatedParamservTest extends AutomatedTestBase {
 	public void federatedParamservSingleNode() {
 		federatedParamserv(ExecMode.SINGLE_NODE);
 	}
-	
+
 	@Test
 	public void federatedParamservHybrid() {
 		federatedParamserv(ExecMode.HYBRID);
 	}
-	
+
 	private void federatedParamserv(ExecMode mode) {
 		// config
 		getAndLoadTestConfiguration(TEST_NAME);
@@ -114,18 +111,17 @@ public class FederatedParamservTest extends AutomatedTestBase {
 		setOutputBuffering(true);
 
 		int C = 1, Hin = 28, Win = 28;
-		int numFeatures = C*Hin*Win;
+		int numFeatures = C * Hin * Win;
 		int numLabels = 10;
 
 		ExecMode platformOld = setExecMode(mode);
-		
+
 		try {
-		
+
 			// dml name
 			fullDMLScriptName = HOME + TEST_NAME + ".dml";
 			// generate program args
-			List<String> programArgsList = new ArrayList<>(Arrays.asList(
-				"-stats",
+			List<String> programArgsList = new ArrayList<>(Arrays.asList("-stats",
 				"-nvargs",
 				"examples_per_worker=" + _examplesPerWorker,
 				"num_features=" + numFeatures,
@@ -138,28 +134,39 @@ public class FederatedParamservTest extends AutomatedTestBase {
 				"network_type=" + _networkType,
 				"channels=" + C,
 				"hin=" + Hin,
-				"win=" + Win
-			));
-	
+				"win=" + Win));
+
 			// for each worker
 			List<Integer> ports = new ArrayList<>();
 			List<Thread> threads = new ArrayList<>();
 			for(int i = 0; i < _numFederatedWorkers; i++) {
 				// write row partitioned features to disk
-				writeInputMatrixWithMTD("X" + i, generateDummyMNISTFeatures(_examplesPerWorker, C, Hin, Win), false,
-						new MatrixCharacteristics(_examplesPerWorker, numFeatures, _blocksize, _examplesPerWorker * numFeatures));
+				writeInputMatrixWithMTD("X" + i,
+					generateDummyMNISTFeatures(_examplesPerWorker, C, Hin, Win),
+					false,
+					new MatrixCharacteristics(_examplesPerWorker, numFeatures, _blocksize,
+						_examplesPerWorker * numFeatures));
 				// write row partitioned labels to disk
-				writeInputMatrixWithMTD("y" + i, generateDummyMNISTLabels(_examplesPerWorker, numLabels), false,
-						new MatrixCharacteristics(_examplesPerWorker, numLabels, _blocksize, _examplesPerWorker * numLabels));
-	
+				writeInputMatrixWithMTD("y" + i,
+					generateDummyMNISTLabels(_examplesPerWorker, numLabels),
+					false,
+					new MatrixCharacteristics(_examplesPerWorker, numLabels, _blocksize,
+						_examplesPerWorker * numLabels));
+
 				// start worker
 				ports.add(getRandomAvailablePort());
-				threads.add(startLocalFedWorkerThread(ports.get(i)));
-	
+				threads.add(startLocalFedWorkerThread(ports.get(i), 10));
+
 				// add worker to program args
 				programArgsList.add("X" + i + "=" + TestUtils.federatedAddress(ports.get(i), input("X" + i)));
 				programArgsList.add("y" + i + "=" + TestUtils.federatedAddress(ports.get(i), input("y" + i)));
 			}
+			try {
+				Thread.sleep(1000);
+			}
+			catch(InterruptedException e) {
+				e.printStackTrace();
+			}
 	
 			programArgs = programArgsList.toArray(new String[0]);
 			LOG.debug(runTest(null));
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryMatrixTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryMatrixTest.java
index 11f2bd4..958c09b 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryMatrixTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryMatrixTest.java
@@ -95,7 +95,7 @@ public class FederatedBinaryMatrixTest extends AutomatedTestBase {
 
 		int port1 = getRandomAvailablePort();
 		int port2 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorkerThread(port1);
+		Thread t1 = startLocalFedWorkerThread(port1, 10);
 		Thread t2 = startLocalFedWorkerThread(port2);
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryVectorTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryVectorTest.java
index f11b7be..e8dd6f7 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryVectorTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryVectorTest.java
@@ -96,7 +96,7 @@ public class FederatedBinaryVectorTest extends AutomatedTestBase {
 
 		int port1 = getRandomAvailablePort();
 		int port2 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorkerThread(port1);
+		Thread t1 = startLocalFedWorkerThread(port1, 10);
 		Thread t2 = startLocalFedWorkerThread(port2);
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedCastToFrameTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedCastToFrameTest.java
index b67cc93..fe03906 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedCastToFrameTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedCastToFrameTest.java
@@ -97,7 +97,7 @@ public class FederatedCastToFrameTest extends AutomatedTestBase {
 
 		int port1 = getRandomAvailablePort();
 		int port2 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorkerThread(port1);
+		Thread t1 = startLocalFedWorkerThread(port1, 10);
 		Thread t2 = startLocalFedWorkerThread(port2);
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedCastToMatrixTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedCastToMatrixTest.java
index 57ffacf..fa51d89 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedCastToMatrixTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedCastToMatrixTest.java
@@ -126,7 +126,7 @@ public class FederatedCastToMatrixTest extends AutomatedTestBase {
 
 			int port1 = getRandomAvailablePort();
 			int port2 = getRandomAvailablePort();
-			Thread t1 = startLocalFedWorkerThread(port1);
+			Thread t1 = startLocalFedWorkerThread(port1, 10);
 			Thread t2 = startLocalFedWorkerThread(port2);
 
 			TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedCentralMomentTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedCentralMomentTest.java
index 4d644ce..828718e 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedCentralMomentTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedCentralMomentTest.java
@@ -98,10 +98,10 @@ public class FederatedCentralMomentTest extends AutomatedTestBase {
         int port2 = getRandomAvailablePort();
         int port3 = getRandomAvailablePort();
         int port4 = getRandomAvailablePort();
-        Thread t1 = startLocalFedWorkerThread(port1);
-        Thread t2 = startLocalFedWorkerThread(port2);
-        Thread t3 = startLocalFedWorkerThread(port3);
-        Thread t4 = startLocalFedWorkerThread(port4);
+		Thread t1 = startLocalFedWorkerThread(port1, 10);
+		Thread t2 = startLocalFedWorkerThread(port2, 10);
+		Thread t3 = startLocalFedWorkerThread(port3, 10);
+		Thread t4 = startLocalFedWorkerThread(port4);
 
         // reference file should not be written to hdfs, so we set platform here
         rtplatform = execMode;
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRowColAggregateTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedColAggregateTest.java
similarity index 70%
copy from src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRowColAggregateTest.java
copy to src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedColAggregateTest.java
index 31800af..a8480e9 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRowColAggregateTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedColAggregateTest.java
@@ -36,20 +36,15 @@ import org.junit.runners.Parameterized;
 
 @RunWith(value = Parameterized.class)
 @net.jcip.annotations.NotThreadSafe
-public class FederatedRowColAggregateTest extends AutomatedTestBase {
+public class FederatedColAggregateTest extends AutomatedTestBase {
 	private final static String TEST_NAME1 = "FederatedColSumTest";
 	private final static String TEST_NAME2 = "FederatedColMeanTest";
 	private final static String TEST_NAME3 = "FederatedColMaxTest";
 	private final static String TEST_NAME4 = "FederatedColMinTest";
-	private final static String TEST_NAME5 = "FederatedRowSumTest";
-	private final static String TEST_NAME6 = "FederatedRowMeanTest";
-	private final static String TEST_NAME7 = "FederatedRowMaxTest";
-	private final static String TEST_NAME8 = "FederatedRowMinTest";
-	private final static String TEST_NAME9 = "FederatedRowVarTest";
 	private final static String TEST_NAME10 = "FederatedColVarTest";
 
 	private final static String TEST_DIR = "functions/federated/aggregate/";
-	private static final String TEST_CLASS_DIR = TEST_DIR + FederatedRowColAggregateTest.class.getSimpleName() + "/";
+	private static final String TEST_CLASS_DIR = TEST_DIR + FederatedColAggregateTest.class.getSimpleName() + "/";
 
 	private final static int blocksize = 1024;
 	@Parameterized.Parameter()
@@ -72,10 +67,6 @@ public class FederatedRowColAggregateTest extends AutomatedTestBase {
 		SUM, MEAN, MAX, MIN, VAR
 	}
 
-	private enum InstType {
-		ROW, COL
-	}
-
 	@Override
 	public void setUp() {
 		TestUtils.clearAssertionInformation();
@@ -83,65 +74,36 @@ public class FederatedRowColAggregateTest extends AutomatedTestBase {
 		addTestConfiguration(TEST_NAME2, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] {"S"}));
 		addTestConfiguration(TEST_NAME3, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME3, new String[] {"S"}));
 		addTestConfiguration(TEST_NAME4, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME4, new String[] {"S"}));
-		addTestConfiguration(TEST_NAME5, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME5, new String[] {"S"}));
-		addTestConfiguration(TEST_NAME6, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME6, new String[] {"S"}));
-		addTestConfiguration(TEST_NAME7, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME7, new String[] {"S"}));
-		addTestConfiguration(TEST_NAME8, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME8, new String[] {"S"}));
-		addTestConfiguration(TEST_NAME9, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME9, new String[] {"S"}));
 		addTestConfiguration(TEST_NAME10, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME10, new String[] {"S"}));
 	}
 
 	@Test
 	public void testColSumDenseMatrixCP() {
-		runAggregateOperationTest(OpType.SUM, InstType.COL, ExecMode.SINGLE_NODE);
+		runAggregateOperationTest(OpType.SUM, ExecMode.SINGLE_NODE);
 	}
 
 	@Test
 	public void testColMeanDenseMatrixCP() {
-		runAggregateOperationTest(OpType.MEAN, InstType.COL, ExecMode.SINGLE_NODE);
+		runAggregateOperationTest(OpType.MEAN, ExecMode.SINGLE_NODE);
 	}
 
 	@Test
 	public void testColMaxDenseMatrixCP() {
-		runAggregateOperationTest(OpType.MAX, InstType.COL, ExecMode.SINGLE_NODE);
-	}
-
-	@Test
-	public void testRowSumDenseMatrixCP() {
-		runAggregateOperationTest(OpType.SUM, InstType.ROW, ExecMode.SINGLE_NODE);
-	}
-
-	@Test
-	public void testRowMeanDenseMatrixCP() {
-		runAggregateOperationTest(OpType.MEAN, InstType.ROW, ExecMode.SINGLE_NODE);
+		runAggregateOperationTest(OpType.MAX, ExecMode.SINGLE_NODE);
 	}
 
-	@Test
-	public void testRowMaxDenseMatrixCP() {
-		runAggregateOperationTest(OpType.MAX, InstType.ROW, ExecMode.SINGLE_NODE);
-	}
-
-	@Test
-	public void testRowMinDenseMatrixCP() {
-		runAggregateOperationTest(OpType.MIN, InstType.ROW, ExecMode.SINGLE_NODE);
-	}
 
 	@Test
 	public void testColMinDenseMatrixCP() {
-		runAggregateOperationTest(OpType.MIN, InstType.COL, ExecMode.SINGLE_NODE);
-	}
-
-	@Test
-	public void testRowVarDenseMatrixCP() {
-		runAggregateOperationTest(OpType.VAR, InstType.ROW, ExecMode.SINGLE_NODE);
+		runAggregateOperationTest(OpType.MIN, ExecMode.SINGLE_NODE);
 	}
 
 	@Test
 	public void testColVarDenseMatrixCP() {
-		runAggregateOperationTest(OpType.VAR, InstType.COL, ExecMode.SINGLE_NODE);
+		runAggregateOperationTest(OpType.VAR, ExecMode.SINGLE_NODE);
 	}
 
-	private void runAggregateOperationTest(OpType type, InstType instr, ExecMode execMode) {
+	private void runAggregateOperationTest(OpType type, ExecMode execMode) {
 		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
 		ExecMode platformOld = rtplatform;
 
@@ -151,19 +113,19 @@ public class FederatedRowColAggregateTest extends AutomatedTestBase {
 		String TEST_NAME = null;
 		switch(type) {
 			case SUM:
-				TEST_NAME = instr == InstType.COL ? TEST_NAME1 : TEST_NAME5;
+				TEST_NAME = TEST_NAME1;
 				break;
 			case MEAN:
-				TEST_NAME = instr == InstType.COL ? TEST_NAME2 : TEST_NAME6;
+				TEST_NAME = TEST_NAME2;
 				break;
 			case MAX:
-				TEST_NAME = instr == InstType.COL ? TEST_NAME3 : TEST_NAME7;
+				TEST_NAME = TEST_NAME3;
 				break;
 			case MIN:
-				TEST_NAME = instr == InstType.COL ? TEST_NAME4 : TEST_NAME8;
+				TEST_NAME = TEST_NAME4;
 				break;
 			case VAR:
-				TEST_NAME = instr == InstType.COL ? TEST_NAME10 : TEST_NAME9;
+				TEST_NAME = TEST_NAME10;
 				break;
 		}
 
@@ -195,9 +157,9 @@ public class FederatedRowColAggregateTest extends AutomatedTestBase {
 		int port2 = getRandomAvailablePort();
 		int port3 = getRandomAvailablePort();
 		int port4 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorkerThread(port1);
-		Thread t2 = startLocalFedWorkerThread(port2);
-		Thread t3 = startLocalFedWorkerThread(port3);
+		Thread t1 = startLocalFedWorkerThread(port1, 10);
+		Thread t2 = startLocalFedWorkerThread(port2, 10);
+		Thread t3 = startLocalFedWorkerThread(port3, 10);
 		Thread t4 = startLocalFedWorkerThread(port4);
 
 		rtplatform = execMode;
@@ -227,9 +189,9 @@ public class FederatedRowColAggregateTest extends AutomatedTestBase {
 		runTest(true, false, null, -1);
 
 		// compare via files
-		compareResults(type == FederatedRowColAggregateTest.OpType.VAR ? 1e-2 : 1e-9);
+		compareResults(type == FederatedColAggregateTest.OpType.VAR ? 1e-2 : 1e-9);
 
-		String fedInst = instr == InstType.COL ? "fed_uac" : "fed_uar";
+		String fedInst = "fed_uac";
 
 		switch(type) {
 			case SUM:
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedFullAggregateTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedFullAggregateTest.java
index ec7bda6..d388913 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedFullAggregateTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedFullAggregateTest.java
@@ -108,6 +108,7 @@ public class FederatedFullAggregateTest extends AutomatedTestBase {
 	}
 
 	@Test
+	@Ignore
 	public void testSumDenseMatrixSP() {
 		runColAggregateOperationTest(OpType.SUM, ExecType.SPARK);
 	}
@@ -131,6 +132,7 @@ public class FederatedFullAggregateTest extends AutomatedTestBase {
 	}
 
 	@Test
+	@Ignore
 	public void testVarDenseMatrixSP() {
 		runColAggregateOperationTest(OpType.VAR, ExecType.SPARK);
 	}
@@ -196,9 +198,9 @@ public class FederatedFullAggregateTest extends AutomatedTestBase {
 		int port2 = getRandomAvailablePort();
 		int port3 = getRandomAvailablePort();
 		int port4 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorkerThread(port1);
-		Thread t2 = startLocalFedWorkerThread(port2);
-		Thread t3 = startLocalFedWorkerThread(port3);
+		Thread t1 = startLocalFedWorkerThread(port1, 10);
+		Thread t2 = startLocalFedWorkerThread(port2, 10);
+		Thread t3 = startLocalFedWorkerThread(port3, 10);
 		Thread t4 = startLocalFedWorkerThread(port4);
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedMultiplyTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedMultiplyTest.java
index 4170914..3bc2649 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedMultiplyTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedMultiplyTest.java
@@ -103,7 +103,7 @@ public class FederatedMultiplyTest extends AutomatedTestBase {
 
 		int port1 = getRandomAvailablePort();
 		int port2 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorkerThread(port1);
+		Thread t1 = startLocalFedWorkerThread(port1, 10);
 		Thread t2 = startLocalFedWorkerThread(port2);
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedNegativeTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedNegativeTest.java
index 2ebe0c8..59dfcff 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedNegativeTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedNegativeTest.java
@@ -19,27 +19,37 @@
 
 package org.apache.sysds.test.functions.federated.primitives;
 
-import org.apache.sysds.common.Types;
-import org.apache.sysds.runtime.controlprogram.federated.*;
-import org.apache.sysds.test.AutomatedTestBase;
-import org.apache.sysds.test.TestUtils;
-import org.junit.Test;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Future;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
+import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Test;
 
 @net.jcip.annotations.NotThreadSafe
 public class FederatedNegativeTest {
 	@Test
 	public void NegativeTest1() {
 		int port = AutomatedTestBase.getRandomAvailablePort();
-		String[] args = {"-w", Integer.toString(port)};
-		Thread t = AutomatedTestBase.startLocalFedWorkerWithArgs(args);
+		Thread t = null;
+		try{
+			String[] args = {"-w", Integer.toString(port)};
+			t = AutomatedTestBase.startLocalFedWorkerWithArgs(args);
+		} catch(Exception e){
+			NegativeTest1();
+		}
 		FederationUtils.resetFedDataID(); //ensure expected ID when tests run in single JVM
 		Map<FederatedRange, FederatedData> fedMap = new HashMap<>();
 		FederatedRange r = new FederatedRange(new long[]{0,0}, new long[]{1,1});
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRCBindTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRCBindTest.java
index abf37eb..540b188 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRCBindTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRCBindTest.java
@@ -93,8 +93,8 @@ public class FederatedRCBindTest extends AutomatedTestBase {
 		writeInputMatrixWithMTD("B", B, false, new MatrixCharacteristics(rows, cols, blocksize, rows * cols));
 
 		int port1 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorkerThread(port1);
-		int port2 = getRandomAvailablePort();
+		int port2 = getRandomAvailablePort();		
+		Thread t1 = startLocalFedWorkerThread(port1, 10);
 		Thread t2 = startLocalFedWorkerThread(port2);
 
 		// we need the reference file to not be written to hdfs, so we get the correct format
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRightIndexTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRightIndexTest.java
index d5f81e9..b9e7f62 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRightIndexTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRightIndexTest.java
@@ -65,14 +65,7 @@ public class FederatedRightIndexTest extends AutomatedTestBase {
 
 	@Parameterized.Parameters
 	public static Collection<Object[]> data() {
-		return Arrays.asList(new Object[][] {
-			// {20, 10, 6, 8, true},
-			{20, 10, 1, 1, true},
-			{20, 10, 2, 10, true},
-			// {20, 10, 2, 10, true},
-			// {20, 12, 2, 10, false}, 
-			// {20, 12, 1, 4, false}
-		});
+		return Arrays.asList(new Object[][] {{20, 10, 1, 1, true}, {20, 10, 3, 5, true}, {10, 12, 1, 10, false}});
 	}
 
 	private enum IndexType {
@@ -87,15 +80,15 @@ public class FederatedRightIndexTest extends AutomatedTestBase {
 		addTestConfiguration(TEST_NAME3, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME3, new String[] {"S"}));
 	}
 
-	@Test
-	public void testRightIndexRightDenseMatrixCP() {
-		runAggregateOperationTest(IndexType.RIGHT, ExecMode.SINGLE_NODE);
-	}
+	// @Test
+	// public void testRightIndexRightDenseMatrixCP() {
+	// runAggregateOperationTest(IndexType.RIGHT, ExecMode.SINGLE_NODE);
+	// }
 
-	@Test
-	public void testRightIndexLeftDenseMatrixCP() {
-		runAggregateOperationTest(IndexType.LEFT, ExecMode.SINGLE_NODE);
-	}
+	// @Test
+	// public void testRightIndexLeftDenseMatrixCP() {
+	// runAggregateOperationTest(IndexType.LEFT, ExecMode.SINGLE_NODE);
+	// }
 
 	@Test
 	public void testRightIndexFullDenseMatrixCP() {
@@ -112,13 +105,19 @@ public class FederatedRightIndexTest extends AutomatedTestBase {
 		String TEST_NAME = null;
 		switch(type) {
 			case RIGHT:
+				from = from <= cols ? from : cols;
+				to = to <= cols ? to : cols;
 				TEST_NAME = TEST_NAME1;
 				break;
 			case LEFT:
+				from = from <= rows ? from : rows;
+				to = to <= rows ? to : rows;
 				TEST_NAME = TEST_NAME2;
 				break;
 			case FULL:
 				TEST_NAME = TEST_NAME3;
+				from = from <= rows && from <= cols ? from : Math.min(rows, cols);
+				to = to <= rows && to <= cols ? to : Math.min(rows, cols);
 				break;
 		}
 
@@ -150,9 +149,9 @@ public class FederatedRightIndexTest extends AutomatedTestBase {
 		int port2 = getRandomAvailablePort();
 		int port3 = getRandomAvailablePort();
 		int port4 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorkerThread(port1);
-		Thread t2 = startLocalFedWorkerThread(port2);
-		Thread t3 = startLocalFedWorkerThread(port3);
+		Thread t1 = startLocalFedWorkerThread(port1, 10);
+		Thread t2 = startLocalFedWorkerThread(port2, 10);
+		Thread t3 = startLocalFedWorkerThread(port3, 10);
 		Thread t4 = startLocalFedWorkerThread(port4);
 
 		rtplatform = execMode;
@@ -163,6 +162,10 @@ public class FederatedRightIndexTest extends AutomatedTestBase {
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
 		loadTestConfiguration(config);
 
+		if(from > to) {
+			from = to;
+		}
+
 		// Run reference dml script with normal matrix
 		fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
 		programArgs = new String[] {"-args", input("X1"), input("X2"), input("X3"), input("X4"), String.valueOf(from),
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRowColAggregateTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRowAggregateTest.java
similarity index 70%
rename from src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRowColAggregateTest.java
rename to src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRowAggregateTest.java
index 31800af..49e692e 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRowColAggregateTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRowAggregateTest.java
@@ -36,20 +36,15 @@ import org.junit.runners.Parameterized;
 
 @RunWith(value = Parameterized.class)
 @net.jcip.annotations.NotThreadSafe
-public class FederatedRowColAggregateTest extends AutomatedTestBase {
-	private final static String TEST_NAME1 = "FederatedColSumTest";
-	private final static String TEST_NAME2 = "FederatedColMeanTest";
-	private final static String TEST_NAME3 = "FederatedColMaxTest";
-	private final static String TEST_NAME4 = "FederatedColMinTest";
+public class FederatedRowAggregateTest extends AutomatedTestBase {
 	private final static String TEST_NAME5 = "FederatedRowSumTest";
 	private final static String TEST_NAME6 = "FederatedRowMeanTest";
 	private final static String TEST_NAME7 = "FederatedRowMaxTest";
 	private final static String TEST_NAME8 = "FederatedRowMinTest";
 	private final static String TEST_NAME9 = "FederatedRowVarTest";
-	private final static String TEST_NAME10 = "FederatedColVarTest";
 
 	private final static String TEST_DIR = "functions/federated/aggregate/";
-	private static final String TEST_CLASS_DIR = TEST_DIR + FederatedRowColAggregateTest.class.getSimpleName() + "/";
+	private static final String TEST_CLASS_DIR = TEST_DIR + FederatedRowAggregateTest.class.getSimpleName() + "/";
 
 	private final static int blocksize = 1024;
 	@Parameterized.Parameter()
@@ -72,76 +67,42 @@ public class FederatedRowColAggregateTest extends AutomatedTestBase {
 		SUM, MEAN, MAX, MIN, VAR
 	}
 
-	private enum InstType {
-		ROW, COL
-	}
-
 	@Override
 	public void setUp() {
 		TestUtils.clearAssertionInformation();
-		addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {"S"}));
-		addTestConfiguration(TEST_NAME2, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] {"S"}));
-		addTestConfiguration(TEST_NAME3, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME3, new String[] {"S"}));
-		addTestConfiguration(TEST_NAME4, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME4, new String[] {"S"}));
 		addTestConfiguration(TEST_NAME5, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME5, new String[] {"S"}));
 		addTestConfiguration(TEST_NAME6, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME6, new String[] {"S"}));
 		addTestConfiguration(TEST_NAME7, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME7, new String[] {"S"}));
 		addTestConfiguration(TEST_NAME8, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME8, new String[] {"S"}));
 		addTestConfiguration(TEST_NAME9, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME9, new String[] {"S"}));
-		addTestConfiguration(TEST_NAME10, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME10, new String[] {"S"}));
-	}
-
-	@Test
-	public void testColSumDenseMatrixCP() {
-		runAggregateOperationTest(OpType.SUM, InstType.COL, ExecMode.SINGLE_NODE);
-	}
-
-	@Test
-	public void testColMeanDenseMatrixCP() {
-		runAggregateOperationTest(OpType.MEAN, InstType.COL, ExecMode.SINGLE_NODE);
-	}
-
-	@Test
-	public void testColMaxDenseMatrixCP() {
-		runAggregateOperationTest(OpType.MAX, InstType.COL, ExecMode.SINGLE_NODE);
 	}
 
 	@Test
 	public void testRowSumDenseMatrixCP() {
-		runAggregateOperationTest(OpType.SUM, InstType.ROW, ExecMode.SINGLE_NODE);
+		runAggregateOperationTest(OpType.SUM, ExecMode.SINGLE_NODE);
 	}
 
 	@Test
 	public void testRowMeanDenseMatrixCP() {
-		runAggregateOperationTest(OpType.MEAN, InstType.ROW, ExecMode.SINGLE_NODE);
+		runAggregateOperationTest(OpType.MEAN, ExecMode.SINGLE_NODE);
 	}
 
 	@Test
 	public void testRowMaxDenseMatrixCP() {
-		runAggregateOperationTest(OpType.MAX, InstType.ROW, ExecMode.SINGLE_NODE);
+		runAggregateOperationTest(OpType.MAX, ExecMode.SINGLE_NODE);
 	}
 
 	@Test
 	public void testRowMinDenseMatrixCP() {
-		runAggregateOperationTest(OpType.MIN, InstType.ROW, ExecMode.SINGLE_NODE);
-	}
-
-	@Test
-	public void testColMinDenseMatrixCP() {
-		runAggregateOperationTest(OpType.MIN, InstType.COL, ExecMode.SINGLE_NODE);
+		runAggregateOperationTest(OpType.MIN, ExecMode.SINGLE_NODE);
 	}
 
 	@Test
 	public void testRowVarDenseMatrixCP() {
-		runAggregateOperationTest(OpType.VAR, InstType.ROW, ExecMode.SINGLE_NODE);
-	}
-
-	@Test
-	public void testColVarDenseMatrixCP() {
-		runAggregateOperationTest(OpType.VAR, InstType.COL, ExecMode.SINGLE_NODE);
+		runAggregateOperationTest(OpType.VAR, ExecMode.SINGLE_NODE);
 	}
 
-	private void runAggregateOperationTest(OpType type, InstType instr, ExecMode execMode) {
+	private void runAggregateOperationTest(OpType type, ExecMode execMode) {
 		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
 		ExecMode platformOld = rtplatform;
 
@@ -151,19 +112,19 @@ public class FederatedRowColAggregateTest extends AutomatedTestBase {
 		String TEST_NAME = null;
 		switch(type) {
 			case SUM:
-				TEST_NAME = instr == InstType.COL ? TEST_NAME1 : TEST_NAME5;
+				TEST_NAME = TEST_NAME5;
 				break;
 			case MEAN:
-				TEST_NAME = instr == InstType.COL ? TEST_NAME2 : TEST_NAME6;
+				TEST_NAME = TEST_NAME6;
 				break;
 			case MAX:
-				TEST_NAME = instr == InstType.COL ? TEST_NAME3 : TEST_NAME7;
+				TEST_NAME = TEST_NAME7;
 				break;
 			case MIN:
-				TEST_NAME = instr == InstType.COL ? TEST_NAME4 : TEST_NAME8;
+				TEST_NAME = TEST_NAME8;
 				break;
 			case VAR:
-				TEST_NAME = instr == InstType.COL ? TEST_NAME10 : TEST_NAME9;
+				TEST_NAME = TEST_NAME9;
 				break;
 		}
 
@@ -195,9 +156,9 @@ public class FederatedRowColAggregateTest extends AutomatedTestBase {
 		int port2 = getRandomAvailablePort();
 		int port3 = getRandomAvailablePort();
 		int port4 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorkerThread(port1);
-		Thread t2 = startLocalFedWorkerThread(port2);
-		Thread t3 = startLocalFedWorkerThread(port3);
+		Thread t1 = startLocalFedWorkerThread(port1, 10);
+		Thread t2 = startLocalFedWorkerThread(port2, 10);
+		Thread t3 = startLocalFedWorkerThread(port3, 10);
 		Thread t4 = startLocalFedWorkerThread(port4);
 
 		rtplatform = execMode;
@@ -227,9 +188,9 @@ public class FederatedRowColAggregateTest extends AutomatedTestBase {
 		runTest(true, false, null, -1);
 
 		// compare via files
-		compareResults(type == FederatedRowColAggregateTest.OpType.VAR ? 1e-2 : 1e-9);
+		compareResults(type == FederatedRowAggregateTest.OpType.VAR ? 1e-2 : 1e-9);
 
-		String fedInst = instr == InstType.COL ? "fed_uac" : "fed_uar";
+		String fedInst = "fed_uar";
 
 		switch(type) {
 			case SUM:
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedSplitTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedSplitTest.java
index 9c4b6d0..9d37aff 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedSplitTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedSplitTest.java
@@ -98,8 +98,8 @@ public class FederatedSplitTest extends AutomatedTestBase {
 
         int port1 = getRandomAvailablePort();
         int port2 = getRandomAvailablePort();
-        Thread t1 = startLocalFedWorkerThread(port1);
-        Thread t2 = startLocalFedWorkerThread(port2);
+		Thread t1 = startLocalFedWorkerThread(port1, 10);
+		Thread t2 = startLocalFedWorkerThread(port2);
 
         // Run reference dml script with normal matrix
         fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedStatisticsTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedStatisticsTest.java
index 99e649f..865582d 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedStatisticsTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedStatisticsTest.java
@@ -99,7 +99,7 @@ public class FederatedStatisticsTest extends AutomatedTestBase {
 		fullDMLScriptName = "";
 		int port1 = getRandomAvailablePort();
 		int port2 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorkerThread(port1);
+		Thread t1 = startLocalFedWorkerThread(port1, 10);
 		Thread t2 = startLocalFedWorkerThread(port2);
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/transform/TransformFederatedEncodeApplyTest.java b/src/test/java/org/apache/sysds/test/functions/federated/transform/TransformFederatedEncodeApplyTest.java
index 3aa0981..b7036d0 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/transform/TransformFederatedEncodeApplyTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/transform/TransformFederatedEncodeApplyTest.java
@@ -196,12 +196,12 @@ public class TransformFederatedEncodeApplyTest extends AutomatedTestBase {
 			getAndLoadTestConfiguration(TEST_NAME1);
 
 			int port1 = getRandomAvailablePort();
-			t1 = startLocalFedWorkerThread(port1);
 			int port2 = getRandomAvailablePort();
-			t2 = startLocalFedWorkerThread(port2);
 			int port3 = getRandomAvailablePort();
-			t3 = startLocalFedWorkerThread(port3);
 			int port4 = getRandomAvailablePort();
+			t1 = startLocalFedWorkerThread(port1, 10);
+			t2 = startLocalFedWorkerThread(port2, 10);
+			t3 = startLocalFedWorkerThread(port3, 10);
 			t4 = startLocalFedWorkerThread(port4);
 
 			FileFormatPropertiesCSV ffpCSV = new FileFormatPropertiesCSV(true, DataExpression.DEFAULT_DELIM_DELIMITER,
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/transform/TransformFederatedEncodeDecodeTest.java b/src/test/java/org/apache/sysds/test/functions/federated/transform/TransformFederatedEncodeDecodeTest.java
index 0c8ec1f..458dbc1 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/transform/TransformFederatedEncodeDecodeTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/transform/TransformFederatedEncodeDecodeTest.java
@@ -131,12 +131,12 @@ public class TransformFederatedEncodeDecodeTest extends AutomatedTestBase {
 			getAndLoadTestConfiguration(TEST_NAME_RECODE);
 
 			int port1 = getRandomAvailablePort();
-			t1 = startLocalFedWorkerThread(port1);
 			int port2 = getRandomAvailablePort();
-			t2 = startLocalFedWorkerThread(port2);
 			int port3 = getRandomAvailablePort();
-			t3 = startLocalFedWorkerThread(port3);
 			int port4 = getRandomAvailablePort();
+			t1 = startLocalFedWorkerThread(port1, 10);
+			t2 = startLocalFedWorkerThread(port2, 10);
+			t3 = startLocalFedWorkerThread(port3, 10);
 			t4 = startLocalFedWorkerThread(port4);
 
 			// schema