You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2020/11/14 21:27:51 UTC
[systemds] branch master updated: [SYSTEMDS-2630] Multi-threaded
slicing in sliced federated broadcasts
This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new acd33e1 [SYSTEMDS-2630] Multi-threaded slicing in sliced federated broadcasts
acd33e1 is described below
commit acd33e166b756cdf5884c89b735089f17ccfb2c6
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Sat Nov 14 22:26:08 2020 +0100
[SYSTEMDS-2630] Multi-threaded slicing in sliced federated broadcasts
For the case of broadcasting a large, potentially sparse, matrix in a
sliced manner (where every federated partition only received the needed
data) so far we sliced the blocks sequentially. With this patch, we
simply do this independent block slicing in a multi-threaded manner in
order to avoid unnecessary overhead in case of large broadcasts.
---
.../controlprogram/federated/FederationMap.java | 28 ++++++++++++++++++----
.../instructions/fed/TsmmFEDInstruction.java | 1 -
2 files changed, 23 insertions(+), 6 deletions(-)
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
index b647476..f670c17 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
@@ -104,20 +104,38 @@ public class FederationMap
return new FederatedRequest(RequestType.PUT_VAR, id, scalar);
}
+ /**
+ * Creates separate slices of an input data object according
+ * to the index ranges of federated data. Theses slices are then
+ * wrapped in separate federated requests for broadcasting.
+ *
+ * @param data input data object (matrix, tensor, frame)
+ * @param transposed false: slice according to federated data,
+ * true: slice according to transposed federated data
+ * @return array of federated requests corresponding to federated data
+ */
public FederatedRequest[] broadcastSliced(CacheableData<?> data, boolean transposed) {
- //prepare separate requests for different slices
+ //prepare broadcast id and pin input
long id = FederationUtils.getNextFedDataID();
CacheBlock cb = data.acquireReadAndRelease();
- List<FederatedRequest> ret = new ArrayList<>();
+
+ //prepare indexing ranges
+ int[][] ix = new int[_fedMap.size()][];
+ int pos = 0;
for(Entry<FederatedRange, FederatedData> e : _fedMap.entrySet()) {
int rl = transposed ? 0 : e.getKey().getBeginDimsInt()[0];
int ru = transposed ? cb.getNumRows()-1 : e.getKey().getEndDimsInt()[0]-1;
int cl = transposed ? e.getKey().getBeginDimsInt()[0] : 0;
int cu = transposed ? e.getKey().getEndDimsInt()[0]-1 : cb.getNumColumns()-1;
- CacheBlock tmp = cb.slice(rl, ru, cl, cu, new MatrixBlock());
- ret.add(new FederatedRequest(RequestType.PUT_VAR, id, tmp));
+ ix[pos++] = new int[] {rl, ru, cl, cu};
}
- return ret.toArray(new FederatedRequest[0]);
+
+ //multi-threaded block slicing and federation request creation
+ FederatedRequest[] ret = new FederatedRequest[ix.length];
+ Arrays.parallelSetAll(ret, i ->
+ new FederatedRequest(RequestType.PUT_VAR, id,
+ cb.slice(ix[i][0], ix[i][1], ix[i][2], ix[i][3], new MatrixBlock())));
+ return ret;
}
public boolean isAligned(FederationMap that, boolean transposed) {
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/TsmmFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/TsmmFEDInstruction.java
index ed9615f..62438c0 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/TsmmFEDInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/TsmmFEDInstruction.java
@@ -32,7 +32,6 @@ import org.apache.sysds.runtime.instructions.InstructionUtils;
import org.apache.sysds.runtime.instructions.cp.CPOperand;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
-import java.util.Arrays;
import java.util.concurrent.Future;
public class TsmmFEDInstruction extends BinaryFEDInstruction {