You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@systemds.apache.org by GitBox <gi...@apache.org> on 2021/05/04 14:39:07 UTC

[GitHub] [systemds] Baunsgaard commented on a change in pull request #1237: [SYSTEMDS-2604] Federated Output for AggregateBinaryFEDInstruction

Baunsgaard commented on a change in pull request #1237:
URL: https://github.com/apache/systemds/pull/1237#discussion_r625831261



##########
File path: src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
##########
@@ -548,27 +550,54 @@ protected MatrixBlock readBlobFromFederated(FederationMap fedMap, long[] dims)
 		throws IOException
 	{
 		// TODO sparse optimization
-		MatrixBlock ret = new MatrixBlock((int) dims[0], (int) dims[1], false);
 		List<Pair<FederatedRange, Future<FederatedResponse>>> readResponses = fedMap.requestFederatedData();
 		try {
-			for (Pair<FederatedRange, Future<FederatedResponse>> readResponse : readResponses) {
-				FederatedRange range = readResponse.getLeft();
-				FederatedResponse response = readResponse.getRight().get();
-				// add result
-				int[] beginDimsInt = range.getBeginDimsInt();
-				int[] endDimsInt = range.getEndDimsInt();
-				MatrixBlock multRes = (MatrixBlock) response.getData()[0];
-				ret.copy(beginDimsInt[0], endDimsInt[0] - 1,
-					beginDimsInt[1], endDimsInt[1] - 1, multRes, false);
-				ret.setNonZeros(ret.getNonZeros() + multRes.getNonZeros());
+			if ( fedMap.getType() == FederationMap.FType.PART )
+				return aggregateResponses(readResponses);
+			else {
+				return bindResponses(readResponses, dims);
 			}
 		}
-		catch (Exception e) {
+		catch(Exception e) {
 			throw new DMLRuntimeException("Federated matrix read failed.", e);
 		}
-		
+	}
+
+	/**
+	 * Bind data from federated workers based on non-overlapping federated ranges.
+	 * @param readResponses responses from federated workers containing the federated ranges and data
+	 * @param dims dimensions of output MatrixBlock
+	 * @return MatrixBlock of consolidated data
+	 * @throws Exception in case of problems with getting data from responses
+	 */
+	private MatrixBlock bindResponses(List<Pair<FederatedRange, Future<FederatedResponse>>> readResponses, long[] dims)
+	throws Exception {
+		MatrixBlock ret = new MatrixBlock((int) dims[0], (int) dims[1], false);
+		for(Pair<FederatedRange, Future<FederatedResponse>> readResponse : readResponses) {
+			FederatedRange range = readResponse.getLeft();
+			FederatedResponse response = readResponse.getRight().get();
+			// add result
+			int[] beginDimsInt = range.getBeginDimsInt();
+			int[] endDimsInt = range.getEndDimsInt();
+			MatrixBlock multRes = (MatrixBlock) response.getData()[0];
+			ret.copy(beginDimsInt[0], endDimsInt[0] - 1, beginDimsInt[1], endDimsInt[1] - 1, multRes, false);
+			ret.setNonZeros(ret.getNonZeros() + multRes.getNonZeros());
+		}
 		return ret;
 	}
+
+	/**
+	 * Aggregate partially aggregated data from federated workers
+	 * by adding values with the same index in different federated locations.
+	 * @param readResponses responses from federated workers containing the federated data
+	 * @return MatrixBlock of consolidated, aggregated data
+	 */
+	private MatrixBlock aggregateResponses(List<Pair<FederatedRange, Future<FederatedResponse>>> readResponses) {
+		List<Future<FederatedResponse>> dataParts = new ArrayList<>();
+		for ( Pair<FederatedRange, Future<FederatedResponse>> readResponse : readResponses )
+			dataParts.add(readResponse.getValue());
+		return FederationUtils.aggAdd(dataParts.toArray(new Future[0]));

Review comment:
       I Like it, it is basically the same as I'm doing with the Overlapping in compression.

##########
File path: src/main/java/org/apache/sysds/lops/MatMultCP.java
##########
@@ -73,15 +73,15 @@ public String toString() {
 	@Override
 	public String getInstructions(String input1, String input2, String output) {
 		if(!useTranspose) {
-			return InstructionUtils.concatOperands(getExecType().name(),
+			InstructionUtils.concatBaseOperands(getExecType().name(),

Review comment:
       I don't like this change because it hides where the variable is.
   also i'm not sure if it works, since strings behave differently in java than other variables (immutable),

##########
File path: src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRange.java
##########
@@ -81,6 +88,10 @@ public long getSize() {
 			size *= getSize(i);
 		return size;
 	}
+
+	public int getOverlapNum(){
+		return _overlapNum;

Review comment:
       In my experience it does not matter how many overlaps you have, just have a boolean specifying if there is overlap or not.

##########
File path: src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRange.java
##########
@@ -94,12 +105,19 @@ public int compareTo(FederatedRange o) {
 			if ( _beginDims[i] > o._beginDims[i])
 				return 1;
 		}
+		if (_overlapNum < o._overlapNum)
+			return -1;
+		if (_overlapNum > o._overlapNum)
+			return 1;

Review comment:
       how is it relevant which index it is?
   if you are trying to keep the overlaps in a certain order, it is really not a needed feature. 
   But then again if it really is necessary to sort the federated ranges i guess this is an okay way.
   
   if you change _overlapNum to be a boolean, you could sort on Object pointers instead in last case.

##########
File path: src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
##########
@@ -50,6 +50,7 @@
 		ROW, // row partitioned, groups of rows
 		COL, // column partitioned, groups of columns
 		FULL, // Meaning both Row and Column indicating a single federated location and a full matrix
+		PART, // Partial aggregates in several federated locations with addition as the aggregation method

Review comment:
       Call it Overlap, to not confuse (at least me) and this overlap is a concept i have in compression as well, where i call it ... overlap.

##########
File path: src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
##########
@@ -189,9 +190,14 @@ public FederatedRequest broadcast(ScalarObject scalar) {
 		return ret;
 	}
 
+	/**
+	 * Determines if the two federation maps are aligned row/column partitions
+	 * at the same federated sites (which allows for purely federated operation)
+	 * @param that FederationMap to check alignment with
+	 * @param transposed true if that FederationMap should be transposed before checking alignment
+	 * @return true if this and that FederationMap are aligned

Review comment:
       Thanks for the documentations !!!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org