You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by se...@apache.org on 2022/01/28 13:55:58 UTC

[systemds] branch main updated: [MINOR] Add Federated Compilation Options

This is an automated email from the ASF dual-hosted git repository.

sebwrede pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 16a506a  [MINOR] Add Federated Compilation Options
16a506a is described below

commit 16a506a6dd7eedf5b949c0aefc9ee3c9ceb78b5b
Author: sebwrede <sw...@know-center.at>
AuthorDate: Thu Jan 20 16:54:57 2022 +0100

    [MINOR] Add Federated Compilation Options
    
    This commit adds options for compiling federated execution plans which are needed for testing and experiment purposes.
    Additionally, various log messages are added and exception handling changed to make federated executions easier to debug.
    
    Closes #1528.
---
 src/main/java/org/apache/sysds/api/DMLOptions.java |  29 ++
 src/main/java/org/apache/sysds/api/DMLScript.java  |   2 +-
 src/main/java/org/apache/sysds/hops/Hop.java       |   4 +
 .../java/org/apache/sysds/hops/OptimizerUtils.java |   3 +
 .../hops/ipa/IPAPassRewriteFederatedPlan.java      |  42 ++-
 .../java/org/apache/sysds/hops/ipa/MemoTable.java  |  15 +
 .../hops/rewrite/RewriteFederatedExecution.java    |   8 +-
 .../controlprogram/federated/FederatedWorker.java  |  14 +-
 .../federated/FederatedWorkerHandler.java          |  20 +-
 .../instructions/fed/FEDInstructionUtils.java      | 310 +++++++++++----------
 10 files changed, 277 insertions(+), 170 deletions(-)

diff --git a/src/main/java/org/apache/sysds/api/DMLOptions.java b/src/main/java/org/apache/sysds/api/DMLOptions.java
index 1b911bc..55773ed 100644
--- a/src/main/java/org/apache/sysds/api/DMLOptions.java
+++ b/src/main/java/org/apache/sysds/api/DMLOptions.java
@@ -31,6 +31,8 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.sysds.common.Types.ExecMode;
 import org.apache.sysds.hops.OptimizerUtils;
+import org.apache.sysds.runtime.instructions.fed.FEDInstruction;
+import org.apache.sysds.runtime.instructions.fed.FEDInstructionUtils;
 import org.apache.sysds.runtime.lineage.LineageCacheConfig.LineageCachePolicy;
 import org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType;
 import org.apache.sysds.utils.Explain;
@@ -74,6 +76,7 @@ public class DMLOptions {
 	public int                  pythonPort    = -1; 
 	public boolean              checkPrivacy  = false;            // Check which privacy constraints are loaded and checked during federated execution 
 	public boolean				federatedCompilation = false;     // Compile federated instructions based on input federation state and privacy constraints.
+	public boolean				noFedRuntimeConversion = false;   // If activated, no runtime conversion of CP instructions to FED instructions will be performed.
 
 	public final static DMLOptions defaultOptions = new DMLOptions(null);
 
@@ -103,6 +106,7 @@ public class DMLOptions {
 			", lineage=" + lineage +
 			", w=" + fedWorker +
 			", federatedCompilation=" + federatedCompilation +
+			", noFedRuntimeConversion=" + noFedRuntimeConversion +
 			'}';
 	}
 	
@@ -266,11 +270,30 @@ public class DMLOptions {
 		}
 
 		dmlOptions.checkPrivacy = line.hasOption("checkPrivacy");
+
 		if (line.hasOption("federatedCompilation")){
 			OptimizerUtils.FEDERATED_COMPILATION = true;
 			dmlOptions.federatedCompilation = true;
+			String[] fedCompSpecs = line.getOptionValues("federatedCompilation");
+			if ( fedCompSpecs != null && fedCompSpecs.length > 0 ){
+				for ( String spec : fedCompSpecs ){
+					String[] specPair = spec.split("=");
+					if (specPair.length != 2){
+						throw new org.apache.commons.cli.ParseException("Invalid argument specified for -federatedCompilation option, must be a list of space separated K=V pairs, where K is a line number of the DML script and V is a federated output value");
+					}
+					int dmlLineNum = Integer.parseInt(specPair[0]);
+					FEDInstruction.FederatedOutput fedOutSpec = FEDInstruction.FederatedOutput.valueOf(specPair[1]);
+					OptimizerUtils.FEDERATED_SPECS.put(dmlLineNum,fedOutSpec);
+				}
+			}
 		}
 
+		if ( line.hasOption("noFedRuntimeConversion") ){
+			FEDInstructionUtils.noFedRuntimeConversion = true;
+			dmlOptions.noFedRuntimeConversion = true;
+		}
+
+
 		return dmlOptions;
 	}
 	
@@ -325,8 +348,13 @@ public class DMLOptions {
 			.withDescription("Check which privacy constraints are loaded and checked during federated execution")
 			.create("checkPrivacy");
 		Option federatedCompilation = OptionBuilder
+			.withArgName("key=value")
 			.withDescription("Compile federated instructions based on input federation state and privacy constraints.")
+			.hasOptionalArgs()
 			.create("federatedCompilation");
+		Option noFedRuntimeConversion = OptionBuilder
+			.withDescription("If activated, no runtime conversion of CP instructions to FED instructions will be performed.")
+			.create("noFedRuntimeConversion");
 		
 		options.addOption(configOpt);
 		options.addOption(cleanOpt);
@@ -341,6 +369,7 @@ public class DMLOptions {
 		options.addOption(fedOpt);
 		options.addOption(checkPrivacy);
 		options.addOption(federatedCompilation);
+		options.addOption(noFedRuntimeConversion);
 
 		// Either a clean(-clean), a file(-f), a script(-s) or help(-help) needs to be specified
 		OptionGroup fileOrScriptOpt = new OptionGroup()
diff --git a/src/main/java/org/apache/sysds/api/DMLScript.java b/src/main/java/org/apache/sysds/api/DMLScript.java
index 4184db1..c2ef42c 100644
--- a/src/main/java/org/apache/sysds/api/DMLScript.java
+++ b/src/main/java/org/apache/sysds/api/DMLScript.java
@@ -278,7 +278,7 @@ public class DMLScript
 			if(dmlOptions.fedWorker) {
 				loadConfiguration(fnameOptConfig);
 				try {
-					new FederatedWorker(dmlOptions.fedWorkerPort).run();
+					new FederatedWorker(dmlOptions.fedWorkerPort, dmlOptions.debug).run();
 				}
 				catch(CertificateException e) {
 					e.printStackTrace();
diff --git a/src/main/java/org/apache/sysds/hops/Hop.java b/src/main/java/org/apache/sysds/hops/Hop.java
index f47fcff..037bfa5 100644
--- a/src/main/java/org/apache/sysds/hops/Hop.java
+++ b/src/main/java/org/apache/sysds/hops/Hop.java
@@ -203,6 +203,10 @@ public abstract class Hop implements ParseInfo {
 		activatePrefetch = true;
 	}
 
+	public void deactivatePrefetch(){
+		activatePrefetch = false;
+	}
+
 	/**
 	 * Checks if prefetch is activated for this hop.
 	 * @return true if prefetch is activated
diff --git a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
index 703b9f2..47b5822 100644
--- a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
@@ -52,6 +52,7 @@ import org.apache.sysds.runtime.functionobjects.IntegerDivide;
 import org.apache.sysds.runtime.functionobjects.Modulus;
 import org.apache.sysds.runtime.instructions.cp.Data;
 import org.apache.sysds.runtime.instructions.cp.ScalarObject;
+import org.apache.sysds.runtime.instructions.fed.FEDInstruction;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.meta.DataCharacteristics;
 import org.apache.sysds.runtime.meta.MatrixCharacteristics;
@@ -60,6 +61,7 @@ import org.apache.sysds.runtime.util.UtilFunctions;
 
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Map;
 
 public class OptimizerUtils 
 {
@@ -215,6 +217,7 @@ public class OptimizerUtils
 	 * Compile federated instructions based on input federation state and privacy constraints.
 	 */
 	public static boolean FEDERATED_COMPILATION = false;
+	public static Map<Integer, FEDInstruction.FederatedOutput> FEDERATED_SPECS = new HashMap<>();
 	
 	/**
 	 * Specifies a multiplier computing the degree of parallelism of parallel
diff --git a/src/main/java/org/apache/sysds/hops/ipa/IPAPassRewriteFederatedPlan.java b/src/main/java/org/apache/sysds/hops/ipa/IPAPassRewriteFederatedPlan.java
index 59333ab..0939b25 100644
--- a/src/main/java/org/apache/sysds/hops/ipa/IPAPassRewriteFederatedPlan.java
+++ b/src/main/java/org/apache/sysds/hops/ipa/IPAPassRewriteFederatedPlan.java
@@ -19,6 +19,8 @@
 
 package org.apache.sysds.hops.ipa;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.hops.AggBinaryOp;
 import org.apache.sysds.hops.AggUnaryOp;
 import org.apache.sysds.hops.BinaryOp;
@@ -54,6 +56,7 @@ import java.util.Set;
  * The rewrite is only applied if federated compilation is activated in OptimizerUtils.
  */
 public class IPAPassRewriteFederatedPlan extends IPAPass {
+	private static final Log LOG = LogFactory.getLog(IPAPassRewriteFederatedPlan.class.getName());
 
 	private final static MemoTable hopRelMemo = new MemoTable();
 	private final static Set<Long> hopRelUpdatedFinal = new HashSet<>();
@@ -238,10 +241,24 @@ public class IPAPassRewriteFederatedPlan extends IPAPass {
 	private void updateFederatedOutput(Hop root, HopRel updateHopRel) {
 		root.setFederatedOutput(updateHopRel.getFederatedOutput());
 		root.setFederatedCost(updateHopRel.getCostObject());
+		forceFixedFedOut(root);
 		hopRelUpdatedFinal.add(root.getHopID());
 	}
 
 	/**
+	 * Set federated output to fixed value if FEDERATED_SPECS is activated for root hop.
+	 * @param root hop set to fixed fedout value as loaded from FEDERATED_SPECS
+	 */
+	private void forceFixedFedOut(Hop root){
+		if ( OptimizerUtils.FEDERATED_SPECS.containsKey(root.getBeginLine()) ){
+			FEDInstruction.FederatedOutput fedOutSpec = OptimizerUtils.FEDERATED_SPECS.get(root.getBeginLine());
+			root.setFederatedOutput(fedOutSpec);
+			if ( fedOutSpec.isForcedFederated() )
+				root.deactivatePrefetch();
+		}
+	}
+
+	/**
 	 * Select federated execution plan for every Hop in the DAG starting from given roots.
 	 * The cost estimates of the hops are also updated when FederatedOutput is updated in the hops.
 	 *
@@ -259,8 +276,10 @@ public class IPAPassRewriteFederatedPlan extends IPAPass {
 	 * @param root starting point for going through the Hop DAG to update the federatedOutput fields
 	 */
 	private void selectFederatedExecutionPlan(Hop root) {
-		visitFedPlanHop(root);
-		setFinalFedout(root);
+		if ( root != null ){
+			visitFedPlanHop(root);
+			setFinalFedout(root);
+		}
 	}
 
 	/**
@@ -274,6 +293,7 @@ public class IPAPassRewriteFederatedPlan extends IPAPass {
 			return;
 		// If the currentHop has input, then the input should be visited depth-first
 		if(currentHop.getInput() != null && currentHop.getInput().size() > 0) {
+			debugLog(currentHop);
 			for(Hop input : currentHop.getInput())
 				visitFedPlanHop(input);
 		}
@@ -290,6 +310,24 @@ public class IPAPassRewriteFederatedPlan extends IPAPass {
 	}
 
 	/**
+	 * Write HOP visit to debug log if debug is activated.
+	 * @param currentHop hop written to log
+	 */
+	private void debugLog(Hop currentHop){
+		if ( LOG.isDebugEnabled() ){
+			LOG.debug("Visiting HOP: " + currentHop + " Input size: " + currentHop.getInput().size());
+			int index = 0;
+			for ( Hop hop : currentHop.getInput()){
+				if ( hop == null )
+					LOG.debug("Input at index is null: " + index);
+				else
+					LOG.debug("HOP input: " + hop + " at index " + index + " of " + currentHop);
+				index++;
+			}
+		}
+	}
+
+	/**
 	 * Checks if the instructions related to the given hop supports FOUT/LOUT processing.
 	 *
 	 * @param hop to check for federated support
diff --git a/src/main/java/org/apache/sysds/hops/ipa/MemoTable.java b/src/main/java/org/apache/sysds/hops/ipa/MemoTable.java
index c1aeff6..fc95c29 100644
--- a/src/main/java/org/apache/sysds/hops/ipa/MemoTable.java
+++ b/src/main/java/org/apache/sysds/hops/ipa/MemoTable.java
@@ -115,4 +115,19 @@ public class MemoTable {
 			&& hopRelMemo.get(root.getHopRef().getHopID()).stream()
 			.anyMatch(h -> h.getFederatedOutput() == root.getFederatedOutput());
 	}
+
+	@Override
+	public String toString(){
+		StringBuilder sb = new StringBuilder();
+		sb.append("Federated MemoTable has ").append(hopRelMemo.size()).append(" entries with the following values:");
+		sb.append("\n").append("{").append("\n");
+		for (Map.Entry<Long,List<HopRel>> hopEntry : hopRelMemo.entrySet()){
+			sb.append("  ").append(hopEntry.getKey()).append(":").append("\n");
+			for ( HopRel hopRel : hopEntry.getValue() ){
+				sb.append("    ").append(hopRel.getFederatedOutput()).append(" ").append(hopRel.getCost()).append("\n");
+			}
+		}
+		sb.append("\n");
+		return sb.toString();
+	}
 }
diff --git a/src/main/java/org/apache/sysds/hops/rewrite/RewriteFederatedExecution.java b/src/main/java/org/apache/sysds/hops/rewrite/RewriteFederatedExecution.java
index 4de2557..6288130 100644
--- a/src/main/java/org/apache/sysds/hops/rewrite/RewriteFederatedExecution.java
+++ b/src/main/java/org/apache/sysds/hops/rewrite/RewriteFederatedExecution.java
@@ -22,6 +22,7 @@ package org.apache.sysds.hops.rewrite;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
 import org.apache.sysds.api.DMLException;
 import org.apache.sysds.hops.Hop;
 import org.apache.sysds.hops.LiteralOp;
@@ -53,6 +54,7 @@ import java.util.ArrayList;
 import java.util.concurrent.Future;
 
 public class RewriteFederatedExecution extends HopRewriteRule {
+	private static final Logger LOG = Logger.getLogger(RewriteFederatedExecution.class);
 
 	@Override
 	public ArrayList<Hop> rewriteHopDAGs(ArrayList<Hop> roots, ProgramRewriteStatus state) {
@@ -72,6 +74,8 @@ public class RewriteFederatedExecution extends HopRewriteRule {
 		if (hop.isVisited())
 			return;
 
+		LOG.debug("RewriteFederatedExecution visitHop + " + hop);
+
 		// Depth first to get to the input
 		for ( Hop input : hop.getInput() )
 			visitHop(input);
@@ -98,11 +102,13 @@ public class RewriteFederatedExecution extends HopRewriteRule {
 	private static void loadFederatedPrivacyConstraints(Hop hop){
 		if ( hop.isFederatedDataOp() && hop.getPrivacy() == null){
 			try {
+				LOG.debug("Load privacy constraints of " + hop);
 				PrivacyConstraint privConstraint = unwrapPrivConstraint(sendPrivConstraintRequest(hop));
+				LOG.debug("PrivacyConstraint retrieved: " + privConstraint);
 				hop.setPrivacy(privConstraint);
 			}
 			catch(Exception e) {
-				throw new DMLException(e.getMessage());
+				throw new DMLException(e);
 			}
 		}
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
index de0e1d8..4090684 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
@@ -47,24 +47,25 @@ import org.apache.sysds.conf.DMLConfig;
 public class FederatedWorker {
 	protected static Logger log = Logger.getLogger(FederatedWorker.class);
 
-	private int _port;
+	private final int _port;
 	private final FederatedLookupTable _flt;
 	private final FederatedReadCache _frc;
+	private final boolean _debug;
 
-	public FederatedWorker(int port) {
+	public FederatedWorker(int port, boolean debug) {
 		_flt = new FederatedLookupTable();
 		_frc = new FederatedReadCache();
 		_port = (port == -1) ? DMLConfig.DEFAULT_FEDERATED_PORT : port;
+		_debug = debug;
 	}
 
 	public void run() throws CertificateException, SSLException {
-		log.info("Setting up Federated Worker");
+		log.info("Setting up Federated Worker on port " + _port);
 		final int EVENT_LOOP_THREADS = Math.max(4, Runtime.getRuntime().availableProcessors() * 4);
 		NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
 		ThreadPoolExecutor workerTPE = new ThreadPoolExecutor(1, Integer.MAX_VALUE,
 			10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(true));
 		NioEventLoopGroup workerGroup = new NioEventLoopGroup(EVENT_LOOP_THREADS, workerTPE);
-
 		ServerBootstrap b = new ServerBootstrap();
 		// TODO add ability to use real ssl files, not self signed certificates.
 		SelfSignedCertificate cert = new SelfSignedCertificate();
@@ -94,7 +95,10 @@ public class FederatedWorker {
 			f.channel().closeFuture().sync();
 		}
 		catch(Exception e) {
-			log.info("Federated worker interrupted");
+			log.error("Federated worker interrupted");
+			log.error(e.getMessage());
+			if ( _debug )
+				e.printStackTrace();
 		}
 		finally {
 			log.info("Federated Worker Shutting down.");
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
index 9f7de1f..ca2b055 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
@@ -129,6 +129,8 @@ public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter {
 		}
 		catch(DMLPrivacyException | FederatedWorkerHandlerException ex) {
 			// Here we control the error message, therefore it is allowed to send the stack trace with the response
+			LOG.error("Exception in FederatedWorkerHandler while processing requests:\n"
+				+ Arrays.toString(requests), ex);
 			return new FederatedResponse(ResponseType.ERROR, ex);
 		}
 		catch(Exception ex) {
@@ -417,16 +419,16 @@ public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter {
 		ExecutionContext ec = ecm.get(request.getTID());
 
 		// get function and input parameters
-		FederatedUDF udf = (FederatedUDF) request.getParam(0);
-		Data[] inputs = Arrays.stream(udf.getInputIDs()).mapToObj(id -> ec.getVariable(String.valueOf(id)))
-			.map(PrivacyMonitor::handlePrivacy).toArray(Data[]::new);
+		try {
+			FederatedUDF udf = (FederatedUDF) request.getParam(0);
+			Data[] inputs = Arrays.stream(udf.getInputIDs()).mapToObj(id -> ec.getVariable(String.valueOf(id)))
+				.map(PrivacyMonitor::handlePrivacy).toArray(Data[]::new);
 
-		// trace lineage
-		if(DMLScript.LINEAGE)
-			LineageItemUtils.traceFedUDF(ec, udf);
+			// trace lineage
+			if(DMLScript.LINEAGE)
+				LineageItemUtils.traceFedUDF(ec, udf);
 
-		// reuse or execute user-defined function
-		try {
+			// reuse or execute user-defined function
 			// reuse UDF outputs if available in lineage cache
 			FederatedResponse reuse = LineageCache.reuse(udf, ec);
 			if(reuse.isSuccessful())
@@ -441,6 +443,8 @@ public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter {
 			return res;
 		}
 		catch(DMLPrivacyException | FederatedWorkerHandlerException ex) {
+			LOG.debug("FederatedWorkerHandler Privacy Constraint " +
+				"exception thrown when processing EXEC_UDF request ", ex);
 			throw ex;
 		}
 		catch(Exception ex) {
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
index 64f1f9a..5915033 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
@@ -94,6 +94,8 @@ public class FEDInstructionUtils {
 	
 	private static String[] PARAM_BUILTINS = new String[]{
 		"replace", "rmempty", "lowertri", "uppertri", "transformdecode", "transformapply", "tokenize"};
+
+	public static boolean noFedRuntimeConversion = false;
 	
 	// private static final Log LOG = LogFactory.getLog(FEDInstructionUtils.class.getName());
 
@@ -109,178 +111,180 @@ public class FEDInstructionUtils {
 	 * @return The potentially modified instruction
 	 */
 	public static Instruction checkAndReplaceCP(Instruction inst, ExecutionContext ec) {
-		FEDInstruction fedinst = null;
-		if (inst instanceof AggregateBinaryCPInstruction) {
-			AggregateBinaryCPInstruction instruction = (AggregateBinaryCPInstruction) inst;
-			if( instruction.input1.isMatrix() && instruction.input2.isMatrix()) {
-				MatrixObject mo1 = ec.getMatrixObject(instruction.input1);
-				MatrixObject mo2 = ec.getMatrixObject(instruction.input2);
-				if ( (mo1.isFederated(FType.ROW) && mo1.isFederatedExcept(FType.BROADCAST))
-					|| (mo2.isFederated(FType.ROW) && mo2.isFederatedExcept(FType.BROADCAST))
-					|| (mo1.isFederated(FType.COL) && mo1.isFederatedExcept(FType.BROADCAST))) {
-					fedinst = AggregateBinaryFEDInstruction.parseInstruction(
-						InstructionUtils.concatOperands(inst.getInstructionString(), FederatedOutput.NONE.name()));
+		if ( !noFedRuntimeConversion ){
+			FEDInstruction fedinst = null;
+			if (inst instanceof AggregateBinaryCPInstruction) {
+				AggregateBinaryCPInstruction instruction = (AggregateBinaryCPInstruction) inst;
+				if( instruction.input1.isMatrix() && instruction.input2.isMatrix()) {
+					MatrixObject mo1 = ec.getMatrixObject(instruction.input1);
+					MatrixObject mo2 = ec.getMatrixObject(instruction.input2);
+					if ( (mo1.isFederated(FType.ROW) && mo1.isFederatedExcept(FType.BROADCAST))
+						|| (mo2.isFederated(FType.ROW) && mo2.isFederatedExcept(FType.BROADCAST))
+						|| (mo1.isFederated(FType.COL) && mo1.isFederatedExcept(FType.BROADCAST))) {
+						fedinst = AggregateBinaryFEDInstruction.parseInstruction(
+							InstructionUtils.concatOperands(inst.getInstructionString(), FederatedOutput.NONE.name()));
+					}
 				}
 			}
-		}
-		else if( inst instanceof MMChainCPInstruction) {
-			MMChainCPInstruction linst = (MMChainCPInstruction) inst;
-			MatrixObject mo = ec.getMatrixObject(linst.input1);
-			if( mo.isFederated(FType.ROW) )
-				fedinst = MMChainFEDInstruction.parseInstruction(linst.getInstructionString());
-		}
-		else if( inst instanceof MMTSJCPInstruction ) {
-			MMTSJCPInstruction linst = (MMTSJCPInstruction) inst;
-			MatrixObject mo = ec.getMatrixObject(linst.input1);
-			if( (mo.isFederated(FType.ROW) && mo.isFederatedExcept(FType.BROADCAST) && linst.getMMTSJType().isLeft()) ||
-				(mo.isFederated(FType.COL) && mo.isFederatedExcept(FType.BROADCAST) && linst.getMMTSJType().isRight()))
-				fedinst = TsmmFEDInstruction.parseInstruction(linst.getInstructionString());
-		}
-		else if (inst instanceof UnaryCPInstruction && ! (inst instanceof IndexingCPInstruction)) {
-			UnaryCPInstruction instruction = (UnaryCPInstruction) inst;
-			if(inst instanceof ReorgCPInstruction && (inst.getOpcode().equals("r'") || inst.getOpcode().equals("rdiag")
-				|| inst.getOpcode().equals("rev"))) {
-				ReorgCPInstruction rinst = (ReorgCPInstruction) inst;
-				CacheableData<?> mo = ec.getCacheableData(rinst.input1);
-
-				if((mo instanceof MatrixObject || mo instanceof FrameObject)
-					&& mo.isFederatedExcept(FType.BROADCAST) )
-					fedinst = ReorgFEDInstruction.parseInstruction(
-						InstructionUtils.concatOperands(rinst.getInstructionString(),FederatedOutput.NONE.name()));
+			else if( inst instanceof MMChainCPInstruction) {
+				MMChainCPInstruction linst = (MMChainCPInstruction) inst;
+				MatrixObject mo = ec.getMatrixObject(linst.input1);
+				if( mo.isFederated(FType.ROW) )
+					fedinst = MMChainFEDInstruction.parseInstruction(linst.getInstructionString());
 			}
-			else if(instruction.input1 != null && instruction.input1.isMatrix()
-				&& ec.containsVariable(instruction.input1)) {
+			else if( inst instanceof MMTSJCPInstruction ) {
+				MMTSJCPInstruction linst = (MMTSJCPInstruction) inst;
+				MatrixObject mo = ec.getMatrixObject(linst.input1);
+				if( (mo.isFederated(FType.ROW) && mo.isFederatedExcept(FType.BROADCAST) && linst.getMMTSJType().isLeft()) ||
+					(mo.isFederated(FType.COL) && mo.isFederatedExcept(FType.BROADCAST) && linst.getMMTSJType().isRight()))
+					fedinst = TsmmFEDInstruction.parseInstruction(linst.getInstructionString());
+			}
+			else if (inst instanceof UnaryCPInstruction && ! (inst instanceof IndexingCPInstruction)) {
+				UnaryCPInstruction instruction = (UnaryCPInstruction) inst;
+				if(inst instanceof ReorgCPInstruction && (inst.getOpcode().equals("r'") || inst.getOpcode().equals("rdiag")
+					|| inst.getOpcode().equals("rev"))) {
+					ReorgCPInstruction rinst = (ReorgCPInstruction) inst;
+					CacheableData<?> mo = ec.getCacheableData(rinst.input1);
 
-				MatrixObject mo1 = ec.getMatrixObject(instruction.input1);
-				if( mo1.isFederatedExcept(FType.BROADCAST) ) {
-					if(instruction.getOpcode().equalsIgnoreCase("cm"))
-						fedinst = CentralMomentFEDInstruction.parseInstruction(inst.getInstructionString());
-					else if(inst.getOpcode().equalsIgnoreCase("qsort")) {
-						if(mo1.getFedMapping().getFederatedRanges().length == 1)
-							fedinst = QuantileSortFEDInstruction.parseInstruction(inst.getInstructionString());
+					if((mo instanceof MatrixObject || mo instanceof FrameObject)
+						&& mo.isFederatedExcept(FType.BROADCAST) )
+						fedinst = ReorgFEDInstruction.parseInstruction(
+							InstructionUtils.concatOperands(rinst.getInstructionString(),FederatedOutput.NONE.name()));
+				}
+				else if(instruction.input1 != null && instruction.input1.isMatrix()
+					&& ec.containsVariable(instruction.input1)) {
+
+					MatrixObject mo1 = ec.getMatrixObject(instruction.input1);
+					if( mo1.isFederatedExcept(FType.BROADCAST) ) {
+						if(instruction.getOpcode().equalsIgnoreCase("cm"))
+							fedinst = CentralMomentFEDInstruction.parseInstruction(inst.getInstructionString());
+						else if(inst.getOpcode().equalsIgnoreCase("qsort")) {
+							if(mo1.getFedMapping().getFederatedRanges().length == 1)
+								fedinst = QuantileSortFEDInstruction.parseInstruction(inst.getInstructionString());
+						}
+						else if(inst.getOpcode().equalsIgnoreCase("rshape"))
+							fedinst = ReshapeFEDInstruction.parseInstruction(inst.getInstructionString());
+						else if(inst instanceof AggregateUnaryCPInstruction &&
+							((AggregateUnaryCPInstruction) instruction).getAUType() == AggregateUnaryCPInstruction.AUType.DEFAULT)
+							fedinst = AggregateUnaryFEDInstruction.parseInstruction(
+								InstructionUtils.concatOperands(inst.getInstructionString(),FederatedOutput.NONE.name()));
+						else if(inst instanceof UnaryMatrixCPInstruction) {
+							if(UnaryMatrixFEDInstruction.isValidOpcode(inst.getOpcode()) &&
+								!(inst.getOpcode().equalsIgnoreCase("ucumk+*") && mo1.isFederated(FType.COL)))
+								fedinst = UnaryMatrixFEDInstruction.parseInstruction(inst.getInstructionString());
+						}
 					}
-					else if(inst.getOpcode().equalsIgnoreCase("rshape"))
-						fedinst = ReshapeFEDInstruction.parseInstruction(inst.getInstructionString());
-					else if(inst instanceof AggregateUnaryCPInstruction &&
-						((AggregateUnaryCPInstruction) instruction).getAUType() == AggregateUnaryCPInstruction.AUType.DEFAULT)
-						fedinst = AggregateUnaryFEDInstruction.parseInstruction(
+				}
+			}
+			else if (inst instanceof BinaryCPInstruction) {
+				BinaryCPInstruction instruction = (BinaryCPInstruction) inst;
+				if( (instruction.input1.isMatrix() && ec.getMatrixObject(instruction.input1).isFederatedExcept(FType.BROADCAST))
+					|| (instruction.input2.isMatrix() && ec.getMatrixObject(instruction.input2).isFederatedExcept(FType.BROADCAST))) {
+					if(instruction.getOpcode().equals("append") )
+						fedinst = AppendFEDInstruction.parseInstruction(inst.getInstructionString());
+					else if(instruction.getOpcode().equals("qpick"))
+						fedinst = QuantilePickFEDInstruction.parseInstruction(inst.getInstructionString());
+					else if("cov".equals(instruction.getOpcode()) && (ec.getMatrixObject(instruction.input1).isFederated(FType.ROW) ||
+						ec.getMatrixObject(instruction.input2).isFederated(FType.ROW)))
+						fedinst = CovarianceFEDInstruction.parseInstruction((CovarianceCPInstruction)inst);
+					else
+						fedinst = BinaryFEDInstruction.parseInstruction(
 							InstructionUtils.concatOperands(inst.getInstructionString(),FederatedOutput.NONE.name()));
-					else if(inst instanceof UnaryMatrixCPInstruction) {
-						if(UnaryMatrixFEDInstruction.isValidOpcode(inst.getOpcode()) &&
-							!(inst.getOpcode().equalsIgnoreCase("ucumk+*") && mo1.isFederated(FType.COL)))
-							fedinst = UnaryMatrixFEDInstruction.parseInstruction(inst.getInstructionString());
-					}
 				}
 			}
-		}
-		else if (inst instanceof BinaryCPInstruction) {
-			BinaryCPInstruction instruction = (BinaryCPInstruction) inst;
-			if( (instruction.input1.isMatrix() && ec.getMatrixObject(instruction.input1).isFederatedExcept(FType.BROADCAST))
-				|| (instruction.input2.isMatrix() && ec.getMatrixObject(instruction.input2).isFederatedExcept(FType.BROADCAST))) {
-				if(instruction.getOpcode().equals("append") )
-					fedinst = AppendFEDInstruction.parseInstruction(inst.getInstructionString());
-				else if(instruction.getOpcode().equals("qpick"))
-					fedinst = QuantilePickFEDInstruction.parseInstruction(inst.getInstructionString());
-				else if("cov".equals(instruction.getOpcode()) && (ec.getMatrixObject(instruction.input1).isFederated(FType.ROW) ||
-					ec.getMatrixObject(instruction.input2).isFederated(FType.ROW)))
-					fedinst = CovarianceFEDInstruction.parseInstruction((CovarianceCPInstruction)inst);
-				else
-					fedinst = BinaryFEDInstruction.parseInstruction(
-						InstructionUtils.concatOperands(inst.getInstructionString(),FederatedOutput.NONE.name()));
+			else if( inst instanceof ParameterizedBuiltinCPInstruction ) {
+				ParameterizedBuiltinCPInstruction pinst = (ParameterizedBuiltinCPInstruction) inst;
+				if( ArrayUtils.contains(PARAM_BUILTINS, pinst.getOpcode()) && pinst.getTarget(ec).isFederatedExcept(FType.BROADCAST) )
+					fedinst = ParameterizedBuiltinFEDInstruction.parseInstruction(pinst.getInstructionString());
 			}
-		}
-		else if( inst instanceof ParameterizedBuiltinCPInstruction ) {
-			ParameterizedBuiltinCPInstruction pinst = (ParameterizedBuiltinCPInstruction) inst;
-			if( ArrayUtils.contains(PARAM_BUILTINS, pinst.getOpcode()) && pinst.getTarget(ec).isFederatedExcept(FType.BROADCAST) )
-				fedinst = ParameterizedBuiltinFEDInstruction.parseInstruction(pinst.getInstructionString());
-		}
-		else if (inst instanceof MultiReturnParameterizedBuiltinCPInstruction) {
-			MultiReturnParameterizedBuiltinCPInstruction minst = (MultiReturnParameterizedBuiltinCPInstruction) inst;
-			if(minst.getOpcode().equals("transformencode") && minst.input1.isFrame()) {
-				CacheableData<?> fo = ec.getCacheableData(minst.input1);
-				if(fo.isFederatedExcept(FType.BROADCAST)) {
-					fedinst = MultiReturnParameterizedBuiltinFEDInstruction
-						.parseInstruction(minst.getInstructionString());
+			else if (inst instanceof MultiReturnParameterizedBuiltinCPInstruction) {
+				MultiReturnParameterizedBuiltinCPInstruction minst = (MultiReturnParameterizedBuiltinCPInstruction) inst;
+				if(minst.getOpcode().equals("transformencode") && minst.input1.isFrame()) {
+					CacheableData<?> fo = ec.getCacheableData(minst.input1);
+					if(fo.isFederatedExcept(FType.BROADCAST)) {
+						fedinst = MultiReturnParameterizedBuiltinFEDInstruction
+							.parseInstruction(minst.getInstructionString());
+					}
 				}
 			}
-		}
-		else if(inst instanceof IndexingCPInstruction) {
-			// matrix and frame indexing
-			IndexingCPInstruction minst = (IndexingCPInstruction) inst;
-			if((minst.input1.isMatrix() || minst.input1.isFrame())
-				&& ec.getCacheableData(minst.input1).isFederatedExcept(FType.BROADCAST)) {
-				fedinst = IndexingFEDInstruction.parseInstruction(minst.getInstructionString());
+			else if(inst instanceof IndexingCPInstruction) {
+				// matrix and frame indexing
+				IndexingCPInstruction minst = (IndexingCPInstruction) inst;
+				if((minst.input1.isMatrix() || minst.input1.isFrame())
+					&& ec.getCacheableData(minst.input1).isFederatedExcept(FType.BROADCAST)) {
+					fedinst = IndexingFEDInstruction.parseInstruction(minst.getInstructionString());
+				}
 			}
-		}
-		else if(inst instanceof TernaryCPInstruction) {
-			TernaryCPInstruction tinst = (TernaryCPInstruction) inst;
-			if(inst.getOpcode().equals("_map") && inst instanceof TernaryFrameScalarCPInstruction && !inst.getInstructionString().contains("UtilFunctions")
-				&& tinst.input1.isFrame() && ec.getFrameObject(tinst.input1).isFederated()) {
-				long margin = ec.getScalarInput(tinst.input3).getLongValue();
-				FrameObject fo = ec.getFrameObject(tinst.input1);
-				if(margin == 0 || (fo.isFederated(FType.ROW) && margin == 1) || (fo.isFederated(FType.COL) && margin == 2))
-					fedinst = TernaryFrameScalarFEDInstruction
-						.parseInstruction(InstructionUtils.concatOperands(inst.getInstructionString(),FederatedOutput.NONE.name()));
+			else if(inst instanceof TernaryCPInstruction) {
+				TernaryCPInstruction tinst = (TernaryCPInstruction) inst;
+				if(inst.getOpcode().equals("_map") && inst instanceof TernaryFrameScalarCPInstruction && !inst.getInstructionString().contains("UtilFunctions")
+					&& tinst.input1.isFrame() && ec.getFrameObject(tinst.input1).isFederated()) {
+					long margin = ec.getScalarInput(tinst.input3).getLongValue();
+					FrameObject fo = ec.getFrameObject(tinst.input1);
+					if(margin == 0 || (fo.isFederated(FType.ROW) && margin == 1) || (fo.isFederated(FType.COL) && margin == 2))
+						fedinst = TernaryFrameScalarFEDInstruction
+							.parseInstruction(InstructionUtils.concatOperands(inst.getInstructionString(),FederatedOutput.NONE.name()));
+				}
+				else if((tinst.input1.isMatrix() && ec.getCacheableData(tinst.input1).isFederatedExcept(FType.BROADCAST))
+					|| (tinst.input2.isMatrix() && ec.getCacheableData(tinst.input2).isFederatedExcept(FType.BROADCAST))
+					|| (tinst.input3.isMatrix() && ec.getCacheableData(tinst.input3).isFederatedExcept(FType.BROADCAST))) {
+					fedinst = TernaryFEDInstruction.parseInstruction(tinst.getInstructionString());
+				}
 			}
-			else if((tinst.input1.isMatrix() && ec.getCacheableData(tinst.input1).isFederatedExcept(FType.BROADCAST))
-				|| (tinst.input2.isMatrix() && ec.getCacheableData(tinst.input2).isFederatedExcept(FType.BROADCAST))
-				|| (tinst.input3.isMatrix() && ec.getCacheableData(tinst.input3).isFederatedExcept(FType.BROADCAST))) {
-				fedinst = TernaryFEDInstruction.parseInstruction(tinst.getInstructionString());
+			else if(inst instanceof VariableCPInstruction ){
+				VariableCPInstruction ins = (VariableCPInstruction) inst;
+				if(ins.getVariableOpcode() == VariableOperationCode.Write
+					&& ins.getInput1().isMatrix()
+					&& ins.getInput3().getName().contains("federated")){
+					fedinst = VariableFEDInstruction.parseInstruction(ins);
+				}
+				else if(ins.getVariableOpcode() == VariableOperationCode.CastAsFrameVariable
+					&& ins.getInput1().isMatrix()
+					&& ec.getCacheableData(ins.getInput1()).isFederatedExcept(FType.BROADCAST)){
+					fedinst = VariableFEDInstruction.parseInstruction(ins);
+				}
+				else if(ins.getVariableOpcode() == VariableOperationCode.CastAsMatrixVariable
+					&& ins.getInput1().isFrame()
+					&& ec.getCacheableData(ins.getInput1()).isFederatedExcept(FType.BROADCAST)){
+					fedinst = VariableFEDInstruction.parseInstruction(ins);
+				}
 			}
-		}
-		else if(inst instanceof VariableCPInstruction ){
-			VariableCPInstruction ins = (VariableCPInstruction) inst;
-			if(ins.getVariableOpcode() == VariableOperationCode.Write
-				&& ins.getInput1().isMatrix()
-				&& ins.getInput3().getName().contains("federated")){
-				fedinst = VariableFEDInstruction.parseInstruction(ins);
-			}
-			else if(ins.getVariableOpcode() == VariableOperationCode.CastAsFrameVariable
-				&& ins.getInput1().isMatrix()
-				&& ec.getCacheableData(ins.getInput1()).isFederatedExcept(FType.BROADCAST)){
-				fedinst = VariableFEDInstruction.parseInstruction(ins);
-			}
-			else if(ins.getVariableOpcode() == VariableOperationCode.CastAsMatrixVariable
-				&& ins.getInput1().isFrame()
-				&& ec.getCacheableData(ins.getInput1()).isFederatedExcept(FType.BROADCAST)){
-				fedinst = VariableFEDInstruction.parseInstruction(ins);
+			else if(inst instanceof AggregateTernaryCPInstruction){
+				AggregateTernaryCPInstruction ins = (AggregateTernaryCPInstruction) inst;
+				if(ins.input1.isMatrix() && ec.getCacheableData(ins.input1).isFederatedExcept(FType.BROADCAST)
+					&& ins.input2.isMatrix() && ec.getCacheableData(ins.input2).isFederatedExcept(FType.BROADCAST)) {
+					fedinst = AggregateTernaryFEDInstruction.parseInstruction(ins.getInstructionString());
+				}
 			}
-		}
-		else if(inst instanceof AggregateTernaryCPInstruction){
-			AggregateTernaryCPInstruction ins = (AggregateTernaryCPInstruction) inst;
-			if(ins.input1.isMatrix() && ec.getCacheableData(ins.input1).isFederatedExcept(FType.BROADCAST)
-				&& ins.input2.isMatrix() && ec.getCacheableData(ins.input2).isFederatedExcept(FType.BROADCAST)) {
-				fedinst = AggregateTernaryFEDInstruction.parseInstruction(ins.getInstructionString());
+			else if(inst instanceof QuaternaryCPInstruction) {
+				QuaternaryCPInstruction instruction = (QuaternaryCPInstruction) inst;
+				Data data = ec.getVariable(instruction.input1);
+				if(data instanceof MatrixObject && ((MatrixObject) data).isFederatedExcept(FType.BROADCAST))
+					fedinst = QuaternaryFEDInstruction.parseInstruction(instruction.getInstructionString());
 			}
-		}
-		else if(inst instanceof QuaternaryCPInstruction) {
-			QuaternaryCPInstruction instruction = (QuaternaryCPInstruction) inst;
-			Data data = ec.getVariable(instruction.input1);
-			if(data instanceof MatrixObject && ((MatrixObject) data).isFederatedExcept(FType.BROADCAST))
-				fedinst = QuaternaryFEDInstruction.parseInstruction(instruction.getInstructionString());
-		}
-		else if(inst instanceof SpoofCPInstruction) {
-			SpoofCPInstruction ins = (SpoofCPInstruction) inst;
-			Class<?> scla = ins.getOperatorClass().getSuperclass();
-			if(((scla == SpoofCellwise.class || scla == SpoofMultiAggregate.class || scla == SpoofOuterProduct.class)
+			else if(inst instanceof SpoofCPInstruction) {
+				SpoofCPInstruction ins = (SpoofCPInstruction) inst;
+				Class<?> scla = ins.getOperatorClass().getSuperclass();
+				if(((scla == SpoofCellwise.class || scla == SpoofMultiAggregate.class || scla == SpoofOuterProduct.class)
 					&& SpoofFEDInstruction.isFederated(ec, ins.getInputs(), scla))
-				|| (scla == SpoofRowwise.class && SpoofFEDInstruction.isFederated(ec, FType.ROW, ins.getInputs(), scla))) {
-				fedinst = SpoofFEDInstruction.parseInstruction(ins.getInstructionString());
+					|| (scla == SpoofRowwise.class && SpoofFEDInstruction.isFederated(ec, FType.ROW, ins.getInputs(), scla))) {
+					fedinst = SpoofFEDInstruction.parseInstruction(ins.getInstructionString());
+				}
+			}
+			else if(inst instanceof CtableCPInstruction) {
+				CtableCPInstruction cinst = (CtableCPInstruction) inst;
+				if(inst.getOpcode().equalsIgnoreCase("ctable")
+					&& ( ec.getCacheableData(cinst.input1).isFederated(FType.ROW)
+					|| (cinst.input2.isMatrix() && ec.getCacheableData(cinst.input2).isFederated(FType.ROW))
+					|| (cinst.input3.isMatrix() && ec.getCacheableData(cinst.input3).isFederated(FType.ROW))))
+					fedinst = CtableFEDInstruction.parseInstruction(cinst.getInstructionString());
 			}
-		}
-		else if(inst instanceof CtableCPInstruction) {
-			CtableCPInstruction cinst = (CtableCPInstruction) inst;
-			if(inst.getOpcode().equalsIgnoreCase("ctable")
-				&& ( ec.getCacheableData(cinst.input1).isFederated(FType.ROW)
-				|| (cinst.input2.isMatrix() && ec.getCacheableData(cinst.input2).isFederated(FType.ROW))
-				|| (cinst.input3.isMatrix() && ec.getCacheableData(cinst.input3).isFederated(FType.ROW))))
-				fedinst = CtableFEDInstruction.parseInstruction(cinst.getInstructionString());
-		}
 
-		//set thread id for federated context management
-		if( fedinst != null ) {
-			fedinst.setTID(ec.getTID());
-			return fedinst;
+			//set thread id for federated context management
+			if( fedinst != null ) {
+				fedinst.setTID(ec.getTID());
+				return fedinst;
+			}
 		}
 
 		return inst;