You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@systemml.apache.org by GitBox <gi...@apache.org> on 2020/06/09 16:09:46 UTC

[GitHub] [systemml] Baunsgaard commented on a change in pull request #966: [WIP][SYSTEMML-229] Fed frame recode transform (encode) support

Baunsgaard commented on a change in pull request #966:
URL: https://github.com/apache/systemml/pull/966#discussion_r437441066



##########
File path: src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.fed;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+import org.apache.sysds.runtime.transform.encode.EncoderRecode;
+
+public class MultiReturnParameterizedBuiltinFEDInstruction extends ComputationFEDInstruction {
+	protected final ArrayList<CPOperand> _outputs;
+
+	private MultiReturnParameterizedBuiltinFEDInstruction(Operator op, CPOperand input1, CPOperand input2,
+		ArrayList<CPOperand> outputs, String opcode, String istr) {
+		super(FEDType.MultiReturnParameterizedBuiltin, op, input1, input2, outputs.get(0), opcode, istr);
+		_outputs = outputs;
+	}
+
+	public CPOperand getOutput(int i) {
+		return _outputs.get(i);
+	}
+
+	public static MultiReturnParameterizedBuiltinFEDInstruction parseInstruction(String str) {
+		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
+		ArrayList<CPOperand> outputs = new ArrayList<>();
+		String opcode = parts[0];
+
+		if(opcode.equalsIgnoreCase("transformencode")) {
+			// one input and two outputs
+			CPOperand in1 = new CPOperand(parts[1]);
+			CPOperand in2 = new CPOperand(parts[2]);
+			outputs.add(new CPOperand(parts[3], Types.ValueType.FP64, Types.DataType.MATRIX));
+			outputs.add(new CPOperand(parts[4], Types.ValueType.STRING, Types.DataType.FRAME));
+			return new MultiReturnParameterizedBuiltinFEDInstruction(null, in1, in2, outputs, opcode, str);
+		}
+		else {
+			throw new DMLRuntimeException("Invalid opcode in MultiReturnBuiltin instruction: " + opcode);
+		}
+
+	}
+
+	@Override
+	public void processInstruction(ExecutionContext ec) {
+		// obtain and pin input frame
+		FrameObject fin = ec.getFrameObject(input1.getName());
+		String spec = ec.getScalarInput(input2).getStringValue();
+
+		Map<FederatedRange, FederatedData> fedMapping = fin.getFedMapping();
+
+		// first we use the spec to construct a meta frame which will provide us with info about the encodings
+		List<Pair<FederatedRange, Future<FederatedResponse>>> metaFutures = new ArrayList<>();
+		for(Map.Entry<FederatedRange, FederatedData> entry : fedMapping.entrySet()) {
+			Future<FederatedResponse> response = entry.getValue().executeFederatedOperation(new FederatedRequest(
+				FederatedRequest.FedMethod.ENCODE_META, spec, entry.getKey().getBeginDimsInt()[1] + 1), true);
+			metaFutures.add(new ImmutablePair<>(entry.getKey(), response));
+		}
+
+		// TODO support encodings other than recode
+		// the combined mappings for the frame columns (because we only support recode)
+		Map<String, Long>[] combinedRecodeMaps = new HashMap[(int) fin.getNumColumns()];
+		try {
+			for(Pair<FederatedRange, Future<FederatedResponse>> pair : metaFutures) {
+				FederatedRange range = pair.getKey();
+				FederatedResponse federatedResponse = pair.getValue().get();
+				if(federatedResponse.isSuccessful()) {
+					FrameBlock fb = (FrameBlock) federatedResponse.getData()[0];
+					combineRecodeMaps(combinedRecodeMaps, fb, range.getBeginDimsInt()[1]);
+				}
+			}
+		}
+		catch(InterruptedException | ExecutionException e) {
+			throw new DMLRuntimeException("Federated meta frame creation failed: " + e.getMessage());
+		}
+
+		// construct a single meta frameblock out of the multiple HashMaps with the recodings
+		FrameBlock meta = frameBlockFromRecodeMaps(combinedRecodeMaps);
+
+		// actually encode the frame block and construct an encoded matrix block at worker
+		List<Pair<Map.Entry<FederatedRange, FederatedData>, Future<FederatedResponse>>> encodedFutures = new ArrayList<>();
+		for(Map.Entry<FederatedRange, FederatedData> entry : fedMapping.entrySet()) {
+			FederatedRange fedRange = entry.getKey();
+			int columnStart = (int) fedRange.getBeginDims()[1];
+			int columnEnd = (int) fedRange.getEndDims()[1];
+			
+			// Slice out relevant meta part
+			// range is inclusive
+			FrameBlock slicedMeta = meta.slice(0, meta.getNumRows() - 1, columnStart, columnEnd - 1, null);
+			
+			Future<FederatedResponse> response = entry.getValue().executeFederatedOperation(new FederatedRequest(
+				FederatedRequest.FedMethod.FRAME_ENCODE, slicedMeta, spec, columnStart + 1), true);
+			encodedFutures.add(new ImmutablePair<>(entry, response));
+		}
+		
+		// construct a federated matrix with the encoded data
+		MatrixObject transformedMat = ec.getMatrixObject(getOutput(0));
+		transformedMat.getDataCharacteristics().set(fin.getDataCharacteristics());
+		Map<FederatedRange, FederatedData> transformedFedMapping = new HashMap<>();
+		try {
+			for(Pair<Map.Entry<FederatedRange, FederatedData>, Future<FederatedResponse>> data : encodedFutures) {
+				FederatedResponse federatedResponse = data.getValue().get();
+				if(federatedResponse.isSuccessful()) {
+					FederatedRange federatedRange = data.getKey().getKey();
+					FederatedData federatedData = data.getKey().getValue();
+					long varId = (long) federatedResponse.getData()[0];
+					
+					transformedFedMapping.put(federatedRange, new FederatedData(federatedData, varId));
+				}
+			}
+		}
+		catch(InterruptedException | ExecutionException e) {
+			throw new DMLRuntimeException("Federated transform apply failed: " + e.getMessage());
+		}
+		// set the federated mapping for the matrix
+		transformedMat.setFedMapping(transformedFedMapping);
+
+		// release input and outputs
+		ec.setFrameOutput(getOutput(1).getName(), meta);
+	}
+
+	private FrameBlock frameBlockFromRecodeMaps(Map<String, Long>[] combinedRecodeMaps) {
+		int rows = 0;
+		for(Map<String, Long> map : combinedRecodeMaps) {
+			if(map != null) {
+				rows = Integer.max(rows, map.size());
+			}
+		}
+		FrameBlock fb = new FrameBlock(combinedRecodeMaps.length, Types.ValueType.STRING);
+		fb.ensureAllocatedColumns(rows);
+
+		// find maximum number of elements needed for a column
+		int c = -1;
+		for(Map<String, Long> map : combinedRecodeMaps) {

Review comment:
       parallel per column maybe

##########
File path: src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.fed;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+import org.apache.sysds.runtime.transform.encode.EncoderRecode;
+
+public class MultiReturnParameterizedBuiltinFEDInstruction extends ComputationFEDInstruction {
+	protected final ArrayList<CPOperand> _outputs;
+
+	private MultiReturnParameterizedBuiltinFEDInstruction(Operator op, CPOperand input1, CPOperand input2,
+		ArrayList<CPOperand> outputs, String opcode, String istr) {
+		super(FEDType.MultiReturnParameterizedBuiltin, op, input1, input2, outputs.get(0), opcode, istr);
+		_outputs = outputs;
+	}
+
+	public CPOperand getOutput(int i) {
+		return _outputs.get(i);
+	}
+
+	public static MultiReturnParameterizedBuiltinFEDInstruction parseInstruction(String str) {
+		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
+		ArrayList<CPOperand> outputs = new ArrayList<>();
+		String opcode = parts[0];
+
+		if(opcode.equalsIgnoreCase("transformencode")) {
+			// one input and two outputs
+			CPOperand in1 = new CPOperand(parts[1]);
+			CPOperand in2 = new CPOperand(parts[2]);
+			outputs.add(new CPOperand(parts[3], Types.ValueType.FP64, Types.DataType.MATRIX));
+			outputs.add(new CPOperand(parts[4], Types.ValueType.STRING, Types.DataType.FRAME));
+			return new MultiReturnParameterizedBuiltinFEDInstruction(null, in1, in2, outputs, opcode, str);
+		}
+		else {
+			throw new DMLRuntimeException("Invalid opcode in MultiReturnBuiltin instruction: " + opcode);
+		}
+
+	}
+
+	@Override
+	public void processInstruction(ExecutionContext ec) {
+		// obtain and pin input frame
+		FrameObject fin = ec.getFrameObject(input1.getName());
+		String spec = ec.getScalarInput(input2).getStringValue();
+
+		Map<FederatedRange, FederatedData> fedMapping = fin.getFedMapping();
+
+		// first we use the spec to construct a meta frame which will provide us with info about the encodings
+		List<Pair<FederatedRange, Future<FederatedResponse>>> metaFutures = new ArrayList<>();
+		for(Map.Entry<FederatedRange, FederatedData> entry : fedMapping.entrySet()) {
+			Future<FederatedResponse> response = entry.getValue().executeFederatedOperation(new FederatedRequest(
+				FederatedRequest.FedMethod.ENCODE_META, spec, entry.getKey().getBeginDimsInt()[1] + 1), true);
+			metaFutures.add(new ImmutablePair<>(entry.getKey(), response));
+		}
+
+		// TODO support encodings other than recode
+		// the combined mappings for the frame columns (because we only support recode)
+		Map<String, Long>[] combinedRecodeMaps = new HashMap[(int) fin.getNumColumns()];
+		try {
+			for(Pair<FederatedRange, Future<FederatedResponse>> pair : metaFutures) {
+				FederatedRange range = pair.getKey();
+				FederatedResponse federatedResponse = pair.getValue().get();
+				if(federatedResponse.isSuccessful()) {
+					FrameBlock fb = (FrameBlock) federatedResponse.getData()[0];
+					combineRecodeMaps(combinedRecodeMaps, fb, range.getBeginDimsInt()[1]);
+				}
+			}
+		}
+		catch(InterruptedException | ExecutionException e) {
+			throw new DMLRuntimeException("Federated meta frame creation failed: " + e.getMessage());
+		}
+
+		// construct a single meta frameblock out of the multiple HashMaps with the recodings
+		FrameBlock meta = frameBlockFromRecodeMaps(combinedRecodeMaps);
+
+		// actually encode the frame block and construct an encoded matrix block at worker
+		List<Pair<Map.Entry<FederatedRange, FederatedData>, Future<FederatedResponse>>> encodedFutures = new ArrayList<>();
+		for(Map.Entry<FederatedRange, FederatedData> entry : fedMapping.entrySet()) {
+			FederatedRange fedRange = entry.getKey();
+			int columnStart = (int) fedRange.getBeginDims()[1];
+			int columnEnd = (int) fedRange.getEndDims()[1];
+			
+			// Slice out relevant meta part
+			// range is inclusive
+			FrameBlock slicedMeta = meta.slice(0, meta.getNumRows() - 1, columnStart, columnEnd - 1, null);
+			
+			Future<FederatedResponse> response = entry.getValue().executeFederatedOperation(new FederatedRequest(
+				FederatedRequest.FedMethod.FRAME_ENCODE, slicedMeta, spec, columnStart + 1), true);
+			encodedFutures.add(new ImmutablePair<>(entry, response));
+		}
+		
+		// construct a federated matrix with the encoded data
+		MatrixObject transformedMat = ec.getMatrixObject(getOutput(0));
+		transformedMat.getDataCharacteristics().set(fin.getDataCharacteristics());
+		Map<FederatedRange, FederatedData> transformedFedMapping = new HashMap<>();
+		try {
+			for(Pair<Map.Entry<FederatedRange, FederatedData>, Future<FederatedResponse>> data : encodedFutures) {
+				FederatedResponse federatedResponse = data.getValue().get();
+				if(federatedResponse.isSuccessful()) {
+					FederatedRange federatedRange = data.getKey().getKey();
+					FederatedData federatedData = data.getKey().getValue();
+					long varId = (long) federatedResponse.getData()[0];
+					
+					transformedFedMapping.put(federatedRange, new FederatedData(federatedData, varId));
+				}
+			}
+		}
+		catch(InterruptedException | ExecutionException e) {
+			throw new DMLRuntimeException("Federated transform apply failed: " + e.getMessage());
+		}
+		// set the federated mapping for the matrix
+		transformedMat.setFedMapping(transformedFedMapping);
+
+		// release input and outputs
+		ec.setFrameOutput(getOutput(1).getName(), meta);
+	}
+
+	private FrameBlock frameBlockFromRecodeMaps(Map<String, Long>[] combinedRecodeMaps) {
+		int rows = 0;
+		for(Map<String, Long> map : combinedRecodeMaps) {
+			if(map != null) {
+				rows = Integer.max(rows, map.size());
+			}
+		}
+		FrameBlock fb = new FrameBlock(combinedRecodeMaps.length, Types.ValueType.STRING);
+		fb.ensureAllocatedColumns(rows);

Review comment:
       am i missing something or should number of rows not already be known before the call to this function?

##########
File path: src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
##########
@@ -115,6 +131,77 @@ private FederatedResponse constructResponse(FederatedRequest request) {
 		}
 	}
 
+	private FederatedResponse createFrameEncodeMeta(FederatedRequest request) {
+		// param parsing
+		checkNumParams(request.getNumParams(), 3);
+		String spec = (String) request.getParam(0);
+		int globalOffset = (int) request.getParam(1);
+		long varID = (long) request.getParam(2);
+
+		FrameObject fo = (FrameObject) PrivacyMonitor.handlePrivacy(_vars.get(varID));
+		FrameBlock data = fo.acquireRead();
+		String[] colNames = data.getColumnNames();
+
+		// create the encoder
+		Encoder encoder = EncoderFactory.createEncoder(spec,
+			colNames,
+			data.getNumColumns(),
+			null,
+			globalOffset,
+			globalOffset + data.getNumColumns());
+		// build necessary structures for encoding
+		encoder.build(data);
+		// just get the meta frame block
+		FrameBlock meta = encoder.getMetaData(new FrameBlock(data.getNumColumns(), Types.ValueType.STRING));
+		meta.setColumnNames(colNames);
+		// otherwise data of FrameBlock would be null, therefore it would fail
+		// hack because serialization of FrameBlock does not function if Arrays are not allocated
+		meta.ensureAllocatedColumns(meta.getNumRows());
+		fo.release();
+
+		return new FederatedResponse(FederatedResponse.Type.SUCCESS, meta);
+	}
+
+	private FederatedResponse executeFrameEncode(FederatedRequest request) {

Review comment:
       Im not really a fan of this method located inside the workerHandler.

##########
File path: src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.fed;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+import org.apache.sysds.runtime.transform.encode.EncoderRecode;
+
+public class MultiReturnParameterizedBuiltinFEDInstruction extends ComputationFEDInstruction {
+	protected final ArrayList<CPOperand> _outputs;
+
+	private MultiReturnParameterizedBuiltinFEDInstruction(Operator op, CPOperand input1, CPOperand input2,
+		ArrayList<CPOperand> outputs, String opcode, String istr) {
+		super(FEDType.MultiReturnParameterizedBuiltin, op, input1, input2, outputs.get(0), opcode, istr);
+		_outputs = outputs;
+	}
+
+	public CPOperand getOutput(int i) {
+		return _outputs.get(i);
+	}
+
+	public static MultiReturnParameterizedBuiltinFEDInstruction parseInstruction(String str) {
+		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
+		ArrayList<CPOperand> outputs = new ArrayList<>();
+		String opcode = parts[0];
+
+		if(opcode.equalsIgnoreCase("transformencode")) {
+			// one input and two outputs
+			CPOperand in1 = new CPOperand(parts[1]);
+			CPOperand in2 = new CPOperand(parts[2]);
+			outputs.add(new CPOperand(parts[3], Types.ValueType.FP64, Types.DataType.MATRIX));
+			outputs.add(new CPOperand(parts[4], Types.ValueType.STRING, Types.DataType.FRAME));

Review comment:
       Make it federated output here such that we keep it federated.

##########
File path: src/test/java/org/apache/sysds/test/functions/federated/FederatedConstructionTest.java
##########
@@ -142,7 +142,7 @@ public void federatedConstruction(Types.ExecMode execMode, String testFile, Stri
 			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
 		}
 		fullDMLScriptName = HOME + testFile + ".dml";
-		programArgs = new String[] {"-args", "\"localhost:" + port + "/" + input(inputIdentifier) + "\"",
+		programArgs = new String[] {"-args", TestUtils.federatedAddress("localhost", port, input(inputIdentifier)),

Review comment:
       I Like this TestUtils for the federated Address. Could you change this for more of the federated tests?

##########
File path: src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.fed;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+import org.apache.sysds.runtime.transform.encode.EncoderRecode;
+
+public class MultiReturnParameterizedBuiltinFEDInstruction extends ComputationFEDInstruction {
+	protected final ArrayList<CPOperand> _outputs;
+
+	private MultiReturnParameterizedBuiltinFEDInstruction(Operator op, CPOperand input1, CPOperand input2,
+		ArrayList<CPOperand> outputs, String opcode, String istr) {
+		super(FEDType.MultiReturnParameterizedBuiltin, op, input1, input2, outputs.get(0), opcode, istr);
+		_outputs = outputs;
+	}
+
+	public CPOperand getOutput(int i) {
+		return _outputs.get(i);
+	}
+
+	public static MultiReturnParameterizedBuiltinFEDInstruction parseInstruction(String str) {
+		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
+		ArrayList<CPOperand> outputs = new ArrayList<>();
+		String opcode = parts[0];
+
+		if(opcode.equalsIgnoreCase("transformencode")) {
+			// one input and two outputs
+			CPOperand in1 = new CPOperand(parts[1]);
+			CPOperand in2 = new CPOperand(parts[2]);
+			outputs.add(new CPOperand(parts[3], Types.ValueType.FP64, Types.DataType.MATRIX));
+			outputs.add(new CPOperand(parts[4], Types.ValueType.STRING, Types.DataType.FRAME));
+			return new MultiReturnParameterizedBuiltinFEDInstruction(null, in1, in2, outputs, opcode, str);
+		}
+		else {
+			throw new DMLRuntimeException("Invalid opcode in MultiReturnBuiltin instruction: " + opcode);
+		}
+
+	}
+
+	@Override
+	public void processInstruction(ExecutionContext ec) {
+		// obtain and pin input frame
+		FrameObject fin = ec.getFrameObject(input1.getName());
+		String spec = ec.getScalarInput(input2).getStringValue();
+
+		Map<FederatedRange, FederatedData> fedMapping = fin.getFedMapping();
+
+		// first we use the spec to construct a meta frame which will provide us with info about the encodings
+		List<Pair<FederatedRange, Future<FederatedResponse>>> metaFutures = new ArrayList<>();
+		for(Map.Entry<FederatedRange, FederatedData> entry : fedMapping.entrySet()) {
+			Future<FederatedResponse> response = entry.getValue().executeFederatedOperation(new FederatedRequest(
+				FederatedRequest.FedMethod.ENCODE_META, spec, entry.getKey().getBeginDimsInt()[1] + 1), true);
+			metaFutures.add(new ImmutablePair<>(entry.getKey(), response));
+		}
+
+		// TODO support encodings other than recode
+		// the combined mappings for the frame columns (because we only support recode)
+		Map<String, Long>[] combinedRecodeMaps = new HashMap[(int) fin.getNumColumns()];
+		try {
+			for(Pair<FederatedRange, Future<FederatedResponse>> pair : metaFutures) {
+				FederatedRange range = pair.getKey();
+				FederatedResponse federatedResponse = pair.getValue().get();
+				if(federatedResponse.isSuccessful()) {

Review comment:
       is there ever an else to this if? / if there is should we not throw an exception

##########
File path: src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
##########
@@ -642,6 +642,9 @@ public void setColumn(int c, Array column) {
 
 	///////
 	// serialization / deserialization (implementation of writable and externalizable)
+	// FIXME for FrameBlock fix write and readFields, it does not work if the Arrays are not yet
+	// allocated (after fixing remove hack in FederatedWorkerHandler.createFrameEncodeMeta(FederatedRequest) call to
+	// FrameBlock.ensureAllocatedColumns())

Review comment:
       Maybe make a Jira report?

##########
File path: src/test/java/org/apache/sysds/test/functions/transform/TransformFederatedEncodeDecodeTest.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.transform;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import org.apache.sysds.common.Types;
+import org.apache.sysds.common.Types.ExecMode;
+import org.apache.sysds.runtime.io.FrameReader;
+import org.apache.sysds.runtime.io.FrameReaderFactory;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TransformFederatedEncodeDecodeTest extends AutomatedTestBase {
+	private static final String TEST_NAME1 = "TransformFederatedEncodeDecode";
+	private static final String TEST_DIR = "functions/transform/";
+	private static final String TEST_CLASS_DIR = TEST_DIR + TransformFederatedEncodeDecodeTest.class.getSimpleName()
+		+ "/";
+
+	private static final String SPEC = "TransformEncodeDecodeSpec.json";
+
+	private static final int rows = 1234;
+	private static final int cols = 2;
+	private static final double sparsity1 = 0.9;
+	private static final double sparsity2 = 0.1;
+
+	@Override
+	public void setUp() {
+		TestUtils.clearAssertionInformation();
+		addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {"FO"}));
+	}
+
+	@Test
+	public void runTestCSVDenseCP() {
+		runTransformEncodeDecodeTest(false, Types.FileFormat.CSV);
+	}
+
+	@Test
+	public void runTestCSVSparseCP() {
+		runTransformEncodeDecodeTest(true, Types.FileFormat.CSV);
+	}
+
+	@Test
+	public void runTestTextcellDenseCP() {
+		runTransformEncodeDecodeTest(false, Types.FileFormat.TEXT);
+	}
+
+	@Test
+	public void runTestTextcellSparseCP() {
+		runTransformEncodeDecodeTest(true, Types.FileFormat.TEXT);
+	}
+
+	@Test
+	public void runTestBinaryDenseCP() {
+		runTransformEncodeDecodeTest(false, Types.FileFormat.BINARY);
+	}
+
+	@Test
+	public void runTestBinarySparseCP() {
+		runTransformEncodeDecodeTest(true, Types.FileFormat.BINARY);
+	}
+
+	private void runTransformEncodeDecodeTest(boolean sparse, Types.FileFormat format) {
+		ExecMode platformOld = rtplatform;
+		rtplatform = ExecMode.SINGLE_NODE;
+
+		Thread t1 = null, t2 = null;
+		try {
+			getAndLoadTestConfiguration(TEST_NAME1);
+
+			int port1 = getRandomAvailablePort();
+			t1 = startLocalFedWorker(port1);
+			int port2 = getRandomAvailablePort();
+			t2 = startLocalFedWorker(port2);
+
+			// schema
+			Types.ValueType[] schema = new Types.ValueType[cols / 2];
+			Arrays.fill(schema, Types.ValueType.FP64);
+			// generate and write input data
+			// A is the data that will be aggregated and not recoded
+			double[][] A = TestUtils.round(getRandomMatrix(rows, cols / 2, 1, 15, sparse ? sparsity2 : sparsity1, 7));
+			writeInputFrameWithMTD("A", A, false, schema, format);
+
+			// B will be recoded and will be the column that will be grouped by
+			Arrays.fill(schema, Types.ValueType.STRING);
+			// we set sparsity to 1.0 to ensure all the string labels exist
+			double[][] B = TestUtils.round(getRandomMatrix(rows, cols / 2, 1, 15, 1.0, 8));
+			writeInputFrameWithMTD("B", B, false, schema, format);
+
+			fullDMLScriptName = SCRIPT_DIR + TEST_DIR + TEST_NAME1 + ".dml";
+
+			programArgs = new String[] {"-explain", "-args", TestUtils.federatedAddress("localhost", port1, input("A")),

Review comment:
       If you can live without calling explain in tests it would be great. since it has a tendency to write alittle to much to the terminal, that in turn crash our test suite on workflows.

##########
File path: src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.fed;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+import org.apache.sysds.runtime.transform.encode.EncoderRecode;
+
+public class MultiReturnParameterizedBuiltinFEDInstruction extends ComputationFEDInstruction {
+	protected final ArrayList<CPOperand> _outputs;
+
+	private MultiReturnParameterizedBuiltinFEDInstruction(Operator op, CPOperand input1, CPOperand input2,
+		ArrayList<CPOperand> outputs, String opcode, String istr) {
+		super(FEDType.MultiReturnParameterizedBuiltin, op, input1, input2, outputs.get(0), opcode, istr);
+		_outputs = outputs;
+	}
+
+	public CPOperand getOutput(int i) {
+		return _outputs.get(i);
+	}
+
+	public static MultiReturnParameterizedBuiltinFEDInstruction parseInstruction(String str) {
+		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
+		ArrayList<CPOperand> outputs = new ArrayList<>();
+		String opcode = parts[0];
+
+		if(opcode.equalsIgnoreCase("transformencode")) {
+			// one input and two outputs
+			CPOperand in1 = new CPOperand(parts[1]);
+			CPOperand in2 = new CPOperand(parts[2]);
+			outputs.add(new CPOperand(parts[3], Types.ValueType.FP64, Types.DataType.MATRIX));
+			outputs.add(new CPOperand(parts[4], Types.ValueType.STRING, Types.DataType.FRAME));
+			return new MultiReturnParameterizedBuiltinFEDInstruction(null, in1, in2, outputs, opcode, str);
+		}
+		else {
+			throw new DMLRuntimeException("Invalid opcode in MultiReturnBuiltin instruction: " + opcode);
+		}
+
+	}
+
+	@Override
+	public void processInstruction(ExecutionContext ec) {

Review comment:
       Looking at the reference `MultiReturnParameterizedBuiltinCPInstruction`
   
   It looks like the design there moves the logic entirely to the Encoder. Is there a reason not to do such a thing in this case?
   
   ```java
   
   	@Override 
   	public void processInstruction(ExecutionContext ec) {
   		//obtain and pin input frame
   		FrameBlock fin = ec.getFrameInput(input1.getName());
   		String spec = ec.getScalarInput(input2).getStringValue();
   		String[] colnames = fin.getColumnNames(); 
   		
   		//execute block transform encode
   		Encoder encoder = EncoderFactory.createEncoder(spec, colnames, fin.getNumColumns(), null);
   		MatrixBlock data = encoder.encode(fin, new MatrixBlock(fin.getNumRows(), fin.getNumColumns(), false)); //build and apply
   		FrameBlock meta = encoder.getMetaData(new FrameBlock(fin.getNumColumns(), ValueType.STRING));
   		meta.setColumnNames(colnames);
   		
   		//release input and outputs
   		ec.releaseFrameInput(input1.getName());
   		ec.setMatrixOutput(getOutput(0).getName(), data);
   		ec.setFrameOutput(getOutput(1).getName(), meta);
   	}
   ```

##########
File path: src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
##########
@@ -115,6 +131,77 @@ private FederatedResponse constructResponse(FederatedRequest request) {
 		}
 	}
 
+	private FederatedResponse createFrameEncodeMeta(FederatedRequest request) {

Review comment:
       Im not really a fan of this method located inside the workerHandler.

##########
File path: src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.fed;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+import org.apache.sysds.runtime.transform.encode.EncoderRecode;
+
+public class MultiReturnParameterizedBuiltinFEDInstruction extends ComputationFEDInstruction {
+	protected final ArrayList<CPOperand> _outputs;
+
+	private MultiReturnParameterizedBuiltinFEDInstruction(Operator op, CPOperand input1, CPOperand input2,
+		ArrayList<CPOperand> outputs, String opcode, String istr) {
+		super(FEDType.MultiReturnParameterizedBuiltin, op, input1, input2, outputs.get(0), opcode, istr);

Review comment:
       In this call, you use outputs.get(0)
   
   I see that you just copied from the MultiReturnParameterizedBuiltinCPInstruction but is there not potentially a problem in only noting the first operand in outputs for the super class if not all functions are correctly overwritten?
   
   Maybe if possible try to parse through a null. That way you will get an null pointer exception instead if we encounter some error?
   

##########
File path: src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.fed;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+import org.apache.sysds.runtime.transform.encode.EncoderRecode;
+
+public class MultiReturnParameterizedBuiltinFEDInstruction extends ComputationFEDInstruction {
+	protected final ArrayList<CPOperand> _outputs;
+
+	private MultiReturnParameterizedBuiltinFEDInstruction(Operator op, CPOperand input1, CPOperand input2,
+		ArrayList<CPOperand> outputs, String opcode, String istr) {
+		super(FEDType.MultiReturnParameterizedBuiltin, op, input1, input2, outputs.get(0), opcode, istr);
+		_outputs = outputs;
+	}
+
+	public CPOperand getOutput(int i) {
+		return _outputs.get(i);
+	}
+
+	public static MultiReturnParameterizedBuiltinFEDInstruction parseInstruction(String str) {
+		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
+		ArrayList<CPOperand> outputs = new ArrayList<>();
+		String opcode = parts[0];
+
+		if(opcode.equalsIgnoreCase("transformencode")) {
+			// one input and two outputs
+			CPOperand in1 = new CPOperand(parts[1]);
+			CPOperand in2 = new CPOperand(parts[2]);
+			outputs.add(new CPOperand(parts[3], Types.ValueType.FP64, Types.DataType.MATRIX));
+			outputs.add(new CPOperand(parts[4], Types.ValueType.STRING, Types.DataType.FRAME));
+			return new MultiReturnParameterizedBuiltinFEDInstruction(null, in1, in2, outputs, opcode, str);
+		}
+		else {
+			throw new DMLRuntimeException("Invalid opcode in MultiReturnBuiltin instruction: " + opcode);
+		}
+
+	}
+
+	@Override
+	public void processInstruction(ExecutionContext ec) {
+		// obtain and pin input frame
+		FrameObject fin = ec.getFrameObject(input1.getName());
+		String spec = ec.getScalarInput(input2).getStringValue();
+
+		Map<FederatedRange, FederatedData> fedMapping = fin.getFedMapping();
+
+		// first we use the spec to construct a meta frame which will provide us with info about the encodings
+		List<Pair<FederatedRange, Future<FederatedResponse>>> metaFutures = new ArrayList<>();
+		for(Map.Entry<FederatedRange, FederatedData> entry : fedMapping.entrySet()) {
+			Future<FederatedResponse> response = entry.getValue().executeFederatedOperation(new FederatedRequest(
+				FederatedRequest.FedMethod.ENCODE_META, spec, entry.getKey().getBeginDimsInt()[1] + 1), true);
+			metaFutures.add(new ImmutablePair<>(entry.getKey(), response));
+		}
+
+		// TODO support encodings other than recode
+		// the combined mappings for the frame columns (because we only support recode)

Review comment:
       I think it is important that we already think about the extensions. 
   Since you implemented it for recode, how would you extend for others?
   If the answer is complicated we should consider changing the code to make it less so.

##########
File path: src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.fed;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+import org.apache.sysds.runtime.transform.encode.EncoderRecode;
+
+public class MultiReturnParameterizedBuiltinFEDInstruction extends ComputationFEDInstruction {
+	protected final ArrayList<CPOperand> _outputs;
+
+	private MultiReturnParameterizedBuiltinFEDInstruction(Operator op, CPOperand input1, CPOperand input2,
+		ArrayList<CPOperand> outputs, String opcode, String istr) {
+		super(FEDType.MultiReturnParameterizedBuiltin, op, input1, input2, outputs.get(0), opcode, istr);
+		_outputs = outputs;
+	}
+
+	public CPOperand getOutput(int i) {
+		return _outputs.get(i);
+	}
+
+	public static MultiReturnParameterizedBuiltinFEDInstruction parseInstruction(String str) {
+		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
+		ArrayList<CPOperand> outputs = new ArrayList<>();
+		String opcode = parts[0];
+
+		if(opcode.equalsIgnoreCase("transformencode")) {
+			// one input and two outputs
+			CPOperand in1 = new CPOperand(parts[1]);
+			CPOperand in2 = new CPOperand(parts[2]);
+			outputs.add(new CPOperand(parts[3], Types.ValueType.FP64, Types.DataType.MATRIX));
+			outputs.add(new CPOperand(parts[4], Types.ValueType.STRING, Types.DataType.FRAME));
+			return new MultiReturnParameterizedBuiltinFEDInstruction(null, in1, in2, outputs, opcode, str);

Review comment:
       maybe this null should be changed to an instruction?
   (again it is copied from the CP instruction implementation but maybe we can improve it)
   

##########
File path: src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.fed;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+import org.apache.sysds.runtime.transform.encode.EncoderRecode;
+
+public class MultiReturnParameterizedBuiltinFEDInstruction extends ComputationFEDInstruction {
+	protected final ArrayList<CPOperand> _outputs;
+
+	private MultiReturnParameterizedBuiltinFEDInstruction(Operator op, CPOperand input1, CPOperand input2,
+		ArrayList<CPOperand> outputs, String opcode, String istr) {
+		super(FEDType.MultiReturnParameterizedBuiltin, op, input1, input2, outputs.get(0), opcode, istr);
+		_outputs = outputs;
+	}
+
+	public CPOperand getOutput(int i) {
+		return _outputs.get(i);
+	}
+
+	public static MultiReturnParameterizedBuiltinFEDInstruction parseInstruction(String str) {
+		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
+		ArrayList<CPOperand> outputs = new ArrayList<>();
+		String opcode = parts[0];
+
+		if(opcode.equalsIgnoreCase("transformencode")) {
+			// one input and two outputs
+			CPOperand in1 = new CPOperand(parts[1]);
+			CPOperand in2 = new CPOperand(parts[2]);
+			outputs.add(new CPOperand(parts[3], Types.ValueType.FP64, Types.DataType.MATRIX));
+			outputs.add(new CPOperand(parts[4], Types.ValueType.STRING, Types.DataType.FRAME));
+			return new MultiReturnParameterizedBuiltinFEDInstruction(null, in1, in2, outputs, opcode, str);
+		}
+		else {
+			throw new DMLRuntimeException("Invalid opcode in MultiReturnBuiltin instruction: " + opcode);
+		}
+
+	}
+
+	@Override
+	public void processInstruction(ExecutionContext ec) {

Review comment:
       In general I would highly appreciate if you could split this function into many smaller functions, this will help once we need to add more functionality. (single responsibility principle)

##########
File path: src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.fed;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+import org.apache.sysds.runtime.transform.encode.EncoderRecode;
+
+public class MultiReturnParameterizedBuiltinFEDInstruction extends ComputationFEDInstruction {
+	protected final ArrayList<CPOperand> _outputs;
+
+	private MultiReturnParameterizedBuiltinFEDInstruction(Operator op, CPOperand input1, CPOperand input2,
+		ArrayList<CPOperand> outputs, String opcode, String istr) {
+		super(FEDType.MultiReturnParameterizedBuiltin, op, input1, input2, outputs.get(0), opcode, istr);
+		_outputs = outputs;
+	}
+
+	public CPOperand getOutput(int i) {
+		return _outputs.get(i);
+	}
+
+	public static MultiReturnParameterizedBuiltinFEDInstruction parseInstruction(String str) {
+		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
+		ArrayList<CPOperand> outputs = new ArrayList<>();
+		String opcode = parts[0];
+
+		if(opcode.equalsIgnoreCase("transformencode")) {
+			// one input and two outputs
+			CPOperand in1 = new CPOperand(parts[1]);
+			CPOperand in2 = new CPOperand(parts[2]);
+			outputs.add(new CPOperand(parts[3], Types.ValueType.FP64, Types.DataType.MATRIX));
+			outputs.add(new CPOperand(parts[4], Types.ValueType.STRING, Types.DataType.FRAME));
+			return new MultiReturnParameterizedBuiltinFEDInstruction(null, in1, in2, outputs, opcode, str);
+		}
+		else {
+			throw new DMLRuntimeException("Invalid opcode in MultiReturnBuiltin instruction: " + opcode);
+		}
+
+	}
+
+	@Override
+	public void processInstruction(ExecutionContext ec) {
+		// obtain and pin input frame
+		FrameObject fin = ec.getFrameObject(input1.getName());
+		String spec = ec.getScalarInput(input2).getStringValue();
+
+		Map<FederatedRange, FederatedData> fedMapping = fin.getFedMapping();
+
+		// first we use the spec to construct a meta frame which will provide us with info about the encodings
+		List<Pair<FederatedRange, Future<FederatedResponse>>> metaFutures = new ArrayList<>();
+		for(Map.Entry<FederatedRange, FederatedData> entry : fedMapping.entrySet()) {
+			Future<FederatedResponse> response = entry.getValue().executeFederatedOperation(new FederatedRequest(
+				FederatedRequest.FedMethod.ENCODE_META, spec, entry.getKey().getBeginDimsInt()[1] + 1), true);
+			metaFutures.add(new ImmutablePair<>(entry.getKey(), response));
+		}
+
+		// TODO support encodings other than recode
+		// the combined mappings for the frame columns (because we only support recode)
+		Map<String, Long>[] combinedRecodeMaps = new HashMap[(int) fin.getNumColumns()];
+		try {
+			for(Pair<FederatedRange, Future<FederatedResponse>> pair : metaFutures) {
+				FederatedRange range = pair.getKey();
+				FederatedResponse federatedResponse = pair.getValue().get();
+				if(federatedResponse.isSuccessful()) {
+					FrameBlock fb = (FrameBlock) federatedResponse.getData()[0];
+					combineRecodeMaps(combinedRecodeMaps, fb, range.getBeginDimsInt()[1]);
+				}
+			}
+		}
+		catch(InterruptedException | ExecutionException e) {
+			throw new DMLRuntimeException("Federated meta frame creation failed: " + e.getMessage());
+		}
+
+		// construct a single meta frameblock out of the multiple HashMaps with the recodings
+		FrameBlock meta = frameBlockFromRecodeMaps(combinedRecodeMaps);
+
+		// actually encode the frame block and construct an encoded matrix block at worker
+		List<Pair<Map.Entry<FederatedRange, FederatedData>, Future<FederatedResponse>>> encodedFutures = new ArrayList<>();
+		for(Map.Entry<FederatedRange, FederatedData> entry : fedMapping.entrySet()) {
+			FederatedRange fedRange = entry.getKey();
+			int columnStart = (int) fedRange.getBeginDims()[1];
+			int columnEnd = (int) fedRange.getEndDims()[1];
+			
+			// Slice out relevant meta part
+			// range is inclusive
+			FrameBlock slicedMeta = meta.slice(0, meta.getNumRows() - 1, columnStart, columnEnd - 1, null);
+			
+			Future<FederatedResponse> response = entry.getValue().executeFederatedOperation(new FederatedRequest(
+				FederatedRequest.FedMethod.FRAME_ENCODE, slicedMeta, spec, columnStart + 1), true);
+			encodedFutures.add(new ImmutablePair<>(entry, response));
+		}
+		
+		// construct a federated matrix with the encoded data
+		MatrixObject transformedMat = ec.getMatrixObject(getOutput(0));
+		transformedMat.getDataCharacteristics().set(fin.getDataCharacteristics());
+		Map<FederatedRange, FederatedData> transformedFedMapping = new HashMap<>();
+		try {
+			for(Pair<Map.Entry<FederatedRange, FederatedData>, Future<FederatedResponse>> data : encodedFutures) {
+				FederatedResponse federatedResponse = data.getValue().get();
+				if(federatedResponse.isSuccessful()) {
+					FederatedRange federatedRange = data.getKey().getKey();
+					FederatedData federatedData = data.getKey().getValue();
+					long varId = (long) federatedResponse.getData()[0];
+					
+					transformedFedMapping.put(federatedRange, new FederatedData(federatedData, varId));
+				}
+			}
+		}
+		catch(InterruptedException | ExecutionException e) {
+			throw new DMLRuntimeException("Federated transform apply failed: " + e.getMessage());
+		}
+		// set the federated mapping for the matrix
+		transformedMat.setFedMapping(transformedFedMapping);
+
+		// release input and outputs
+		ec.setFrameOutput(getOutput(1).getName(), meta);
+	}
+
+	private FrameBlock frameBlockFromRecodeMaps(Map<String, Long>[] combinedRecodeMaps) {
+		int rows = 0;
+		for(Map<String, Long> map : combinedRecodeMaps) {
+			if(map != null) {
+				rows = Integer.max(rows, map.size());
+			}
+		}
+		FrameBlock fb = new FrameBlock(combinedRecodeMaps.length, Types.ValueType.STRING);
+		fb.ensureAllocatedColumns(rows);
+
+		// find maximum number of elements needed for a column
+		int c = -1;
+		for(Map<String, Long> map : combinedRecodeMaps) {

Review comment:
       parallel per column maybe?

##########
File path: src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.fed;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+import org.apache.sysds.runtime.transform.encode.EncoderRecode;
+
+public class MultiReturnParameterizedBuiltinFEDInstruction extends ComputationFEDInstruction {
+	protected final ArrayList<CPOperand> _outputs;
+
+	private MultiReturnParameterizedBuiltinFEDInstruction(Operator op, CPOperand input1, CPOperand input2,
+		ArrayList<CPOperand> outputs, String opcode, String istr) {
+		super(FEDType.MultiReturnParameterizedBuiltin, op, input1, input2, outputs.get(0), opcode, istr);
+		_outputs = outputs;
+	}
+
+	public CPOperand getOutput(int i) {
+		return _outputs.get(i);
+	}
+
+	public static MultiReturnParameterizedBuiltinFEDInstruction parseInstruction(String str) {
+		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
+		ArrayList<CPOperand> outputs = new ArrayList<>();
+		String opcode = parts[0];
+
+		if(opcode.equalsIgnoreCase("transformencode")) {
+			// one input and two outputs
+			CPOperand in1 = new CPOperand(parts[1]);
+			CPOperand in2 = new CPOperand(parts[2]);
+			outputs.add(new CPOperand(parts[3], Types.ValueType.FP64, Types.DataType.MATRIX));
+			outputs.add(new CPOperand(parts[4], Types.ValueType.STRING, Types.DataType.FRAME));
+			return new MultiReturnParameterizedBuiltinFEDInstruction(null, in1, in2, outputs, opcode, str);
+		}
+		else {
+			throw new DMLRuntimeException("Invalid opcode in MultiReturnBuiltin instruction: " + opcode);
+		}
+
+	}
+
+	@Override
+	public void processInstruction(ExecutionContext ec) {

Review comment:
       furthermore consider to either implement or mark the for loops that can be parallelized.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org