You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2022/10/17 17:06:17 UTC

[airavata-mft] branch master updated: Parallel distribution of batch transfer submission

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git


The following commit(s) were added to refs/heads/master by this push:
     new 92ed154  Parallel distribution of batch transfer submission
92ed154 is described below

commit 92ed154fe90f145eb7116e5468303a8bf88e46ce
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Mon Oct 17 13:05:58 2022 -0400

    Parallel distribution of batch transfer submission
---
 .../airavata/mft/api/handler/MFTApiHandler.java    | 59 +++++++++++++++++-----
 1 file changed, 46 insertions(+), 13 deletions(-)

diff --git a/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java b/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java
index 36aea48..1078f55 100644
--- a/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java
+++ b/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java
@@ -32,15 +32,15 @@ import org.apache.airavata.mft.core.DirectoryResourceMetadata;
 import org.apache.airavata.mft.core.FileResourceMetadata;
 import org.apache.airavata.mft.core.MetadataCollectorResolver;
 import org.apache.airavata.mft.core.api.MetadataCollector;
+import org.apache.commons.lang3.tuple.Pair;
 import org.dozer.DozerBeanMapper;
 import org.lognet.springboot.grpc.GRpcService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 
-import java.util.List;
-import java.util.Optional;
-import java.util.UUID;
+import java.util.*;
+import java.util.concurrent.*;
 
 @GRpcService
 public class MFTApiHandler extends MFTTransferServiceGrpc.MFTTransferServiceImplBase {
@@ -92,33 +92,66 @@ public class MFTApiHandler extends MFTTransferServiceGrpc.MFTTransferServiceImpl
         }
     }
 
+    private class BatchTransferSubmitter implements Callable<Pair<Integer, String>> {
+        private int index = 0;
+        private TransferApiRequest apiRequest;
+
+        public BatchTransferSubmitter(int index, TransferApiRequest apiRequest) {
+            this.index = index;
+            this.apiRequest = apiRequest;
+        }
+
+        @Override
+        public Pair<Integer, String> call() throws Exception {
+
+            String transferId = mftConsulClient.submitTransfer(apiRequest);
+            logger.info("Submitted the transfer request {}", transferId);
+
+            mftConsulClient.saveTransferState(transferId, new TransferState()
+                    .setUpdateTimeMils(System.currentTimeMillis())
+                    .setState("RECEIVED").setPercentage(0)
+                    .setPublisher("api")
+                    .setDescription("Received transfer job " + transferId));
+
+            return Pair.of(index, transferId);
+        }
+    }
+
     @Override
     public void submitBatchTransfer(BatchTransferApiRequest request, StreamObserver<BatchTransferApiResponse> responseObserver) {
+        ExecutorService executorService = Executors.newFixedThreadPool(20);
+
         try {
             List<TransferApiRequest> transferRequests = request.getTransferRequestsList();
-
             BatchTransferApiResponse.Builder responseBuilder = BatchTransferApiResponse.newBuilder();
-            for (TransferApiRequest apiRequest: transferRequests) {
-                String transferId = mftConsulClient.submitTransfer(apiRequest);
 
-                logger.info("Submitted the transfer request {}", transferId);
+            ExecutorCompletionService<Pair<Integer,String>> completionService = new ExecutorCompletionService<>(executorService);
 
-                mftConsulClient.saveTransferState(transferId, new TransferState()
-                        .setUpdateTimeMils(System.currentTimeMillis())
-                        .setState("RECEIVED").setPercentage(0)
-                        .setPublisher("api")
-                        .setDescription("Received transfer job " + transferId));
+            for (int index = 0; index < transferRequests.size(); index ++) {
+                completionService.submit(new BatchTransferSubmitter(index, transferRequests.get(index)));
+            }
 
-                responseBuilder.addTransferIds(transferId);
+            Map<Integer, String> resultMap = new HashMap<>();
+            for (int index = 0; index < transferRequests.size(); index ++) {
+                Future<Pair<Integer, String>> futureResult = completionService.take();
+                Pair<Integer, String> result = futureResult.get();
+                resultMap.put(result.getLeft(), result.getRight());
+            }
+
+            for (int index = 0; index < transferRequests.size(); index ++) {
+                responseBuilder.addTransferIds(resultMap.get(index));
             }
 
             responseObserver.onNext(responseBuilder.build());
             responseObserver.onCompleted();
+
         } catch (Exception e) {
             logger.error("Error in submitting batch transfer request", e);
             responseObserver.onError(Status.INTERNAL
                     .withDescription("Failed to submit batch transfer request. " + e.getMessage())
                     .asException());
+        } finally {
+            executorService.shutdown();
         }
     }