You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2022/10/17 17:06:17 UTC
[airavata-mft] branch master updated: Parallel distribution of batch transfer submission
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git
The following commit(s) were added to refs/heads/master by this push:
new 92ed154 Parallel distribution of batch transfer submission
92ed154 is described below
commit 92ed154fe90f145eb7116e5468303a8bf88e46ce
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Mon Oct 17 13:05:58 2022 -0400
Parallel distribution of batch transfer submission
---
.../airavata/mft/api/handler/MFTApiHandler.java | 59 +++++++++++++++++-----
1 file changed, 46 insertions(+), 13 deletions(-)
diff --git a/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java b/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java
index 36aea48..1078f55 100644
--- a/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java
+++ b/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java
@@ -32,15 +32,15 @@ import org.apache.airavata.mft.core.DirectoryResourceMetadata;
import org.apache.airavata.mft.core.FileResourceMetadata;
import org.apache.airavata.mft.core.MetadataCollectorResolver;
import org.apache.airavata.mft.core.api.MetadataCollector;
+import org.apache.commons.lang3.tuple.Pair;
import org.dozer.DozerBeanMapper;
import org.lognet.springboot.grpc.GRpcService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import java.util.List;
-import java.util.Optional;
-import java.util.UUID;
+import java.util.*;
+import java.util.concurrent.*;
@GRpcService
public class MFTApiHandler extends MFTTransferServiceGrpc.MFTTransferServiceImplBase {
@@ -92,33 +92,66 @@ public class MFTApiHandler extends MFTTransferServiceGrpc.MFTTransferServiceImpl
}
}
+ private class BatchTransferSubmitter implements Callable<Pair<Integer, String>> {
+ private int index = 0;
+ private TransferApiRequest apiRequest;
+
+ public BatchTransferSubmitter(int index, TransferApiRequest apiRequest) {
+ this.index = index;
+ this.apiRequest = apiRequest;
+ }
+
+ @Override
+ public Pair<Integer, String> call() throws Exception {
+
+ String transferId = mftConsulClient.submitTransfer(apiRequest);
+ logger.info("Submitted the transfer request {}", transferId);
+
+ mftConsulClient.saveTransferState(transferId, new TransferState()
+ .setUpdateTimeMils(System.currentTimeMillis())
+ .setState("RECEIVED").setPercentage(0)
+ .setPublisher("api")
+ .setDescription("Received transfer job " + transferId));
+
+ return Pair.of(index, transferId);
+ }
+ }
+
@Override
public void submitBatchTransfer(BatchTransferApiRequest request, StreamObserver<BatchTransferApiResponse> responseObserver) {
+ ExecutorService executorService = Executors.newFixedThreadPool(20);
+
try {
List<TransferApiRequest> transferRequests = request.getTransferRequestsList();
-
BatchTransferApiResponse.Builder responseBuilder = BatchTransferApiResponse.newBuilder();
- for (TransferApiRequest apiRequest: transferRequests) {
- String transferId = mftConsulClient.submitTransfer(apiRequest);
- logger.info("Submitted the transfer request {}", transferId);
+ ExecutorCompletionService<Pair<Integer,String>> completionService = new ExecutorCompletionService<>(executorService);
- mftConsulClient.saveTransferState(transferId, new TransferState()
- .setUpdateTimeMils(System.currentTimeMillis())
- .setState("RECEIVED").setPercentage(0)
- .setPublisher("api")
- .setDescription("Received transfer job " + transferId));
+ for (int index = 0; index < transferRequests.size(); index ++) {
+ completionService.submit(new BatchTransferSubmitter(index, transferRequests.get(index)));
+ }
- responseBuilder.addTransferIds(transferId);
+ Map<Integer, String> resultMap = new HashMap<>();
+ for (int index = 0; index < transferRequests.size(); index ++) {
+ Future<Pair<Integer, String>> futureResult = completionService.take();
+ Pair<Integer, String> result = futureResult.get();
+ resultMap.put(result.getLeft(), result.getRight());
+ }
+
+ for (int index = 0; index < transferRequests.size(); index ++) {
+ responseBuilder.addTransferIds(resultMap.get(index));
}
responseObserver.onNext(responseBuilder.build());
responseObserver.onCompleted();
+
} catch (Exception e) {
logger.error("Error in submitting batch transfer request", e);
responseObserver.onError(Status.INTERNAL
.withDescription("Failed to submit batch transfer request. " + e.getMessage())
.asException());
+ } finally {
+ executorService.shutdown();
}
}