You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2019/01/31 21:36:13 UTC

[airavata] branch staging-temp updated: Adding user id to adaptor cache

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch staging-temp
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/staging-temp by this push:
     new 3709aaa  Adding user id to adaptor cache
3709aaa is described below

commit 3709aaaef543c1c46d5951b8a7c02156e15b02a7
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Thu Jan 31 16:36:05 2019 -0500

    Adding user id to adaptor cache
---
 .../core/support/adaptor/AdaptorSupportImpl.java   | 30 ++++++++----
 .../helix/core/support/adaptor/AgentStore.java     | 55 +++++++++++++---------
 2 files changed, 55 insertions(+), 30 deletions(-)

diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/adaptor/AdaptorSupportImpl.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/adaptor/AdaptorSupportImpl.java
index 105459f..be09630 100644
--- a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/adaptor/AdaptorSupportImpl.java
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/adaptor/AdaptorSupportImpl.java
@@ -25,6 +25,8 @@ import org.apache.airavata.helix.adaptor.SSHJStorageAdaptor;
 import org.apache.airavata.helix.task.api.support.AdaptorSupport;
 import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
 import org.apache.airavata.model.data.movement.DataMovementProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Optional;
 
@@ -36,6 +38,8 @@ import java.util.Optional;
  */
 public class AdaptorSupportImpl implements AdaptorSupport {
 
+    private final static Logger logger = LoggerFactory.getLogger(AdaptorSupportImpl.class);
+
     private static AdaptorSupportImpl INSTANCE;
 
     private final AgentStore agentStore = new AgentStore();
@@ -52,26 +56,30 @@ public class AdaptorSupportImpl implements AdaptorSupport {
     public void initializeAdaptor() {
     }
 
-    public AgentAdaptor fetchAdaptor(String gatewayId, String computeResource, JobSubmissionProtocol protocol, String authToken, String userId) throws AgentException {
+    public AgentAdaptor fetchAdaptor(String gatewayId, String computeResourceId, JobSubmissionProtocol protocol, String authToken, String userId) throws AgentException {
 
-        Optional<AgentAdaptor> agentAdaptorOp = agentStore.getAgentAdaptor(computeResource, protocol, authToken);
+        Optional<AgentAdaptor> agentAdaptorOp = agentStore.getAgentAdaptor(computeResourceId, protocol, authToken, userId);
         if (agentAdaptorOp.isPresent()) {
+            logger.debug("Re using the adaptor for gateway " + gatewayId + ", compute resource " +
+                    computeResourceId + ", protocol " + protocol + " , user " + userId);
             return agentAdaptorOp.get();
         } else {
             synchronized (this) {
-                agentAdaptorOp = agentStore.getAgentAdaptor(computeResource, protocol, authToken);
+                agentAdaptorOp = agentStore.getAgentAdaptor(computeResourceId, protocol, authToken, userId);
                 if (agentAdaptorOp.isPresent()) {
                     return agentAdaptorOp.get();
                 } else {
+                    logger.debug("Could not find an adaptor for gateway " + gatewayId + ", compute resource " +
+                            computeResourceId + ", protocol " + protocol + " , user " + userId + ". Creating new one");
                     switch (protocol) {
                         case SSH:
                             SSHJAgentAdaptor agentAdaptor = new SSHJAgentAdaptor();
-                            agentAdaptor.init(computeResource, gatewayId, userId, authToken);
-                            agentStore.putAgentAdaptor(computeResource, protocol, authToken, agentAdaptor);
+                            agentAdaptor.init(computeResourceId, gatewayId, userId, authToken);
+                            agentStore.putAgentAdaptor(computeResourceId, protocol, authToken, userId, agentAdaptor);
                             return agentAdaptor;
                         default:
                             throw new AgentException("Could not find an agent adaptor for gateway " + gatewayId +
-                                    ", compute resource " + computeResource + ", protocol " + protocol + " , user " + userId);
+                                    ", compute resource " + computeResourceId + ", protocol " + protocol + " , user " + userId);
                     }
                 }
             }
@@ -80,20 +88,24 @@ public class AdaptorSupportImpl implements AdaptorSupport {
 
     @Override
     public StorageResourceAdaptor fetchStorageAdaptor(String gatewayId, String storageResourceId, DataMovementProtocol protocol, String authToken, String userId) throws AgentException {
-        Optional<StorageResourceAdaptor> agentAdaptorOp = agentStore.getStorageAdaptor(storageResourceId, protocol, authToken);
+        Optional<StorageResourceAdaptor> agentAdaptorOp = agentStore.getStorageAdaptor(storageResourceId, protocol, authToken, userId);
         if (agentAdaptorOp.isPresent()) {
+            logger.debug("Re using the storage adaptor for gateway " + gatewayId + ", storage resource " +
+                    storageResourceId + ", protocol " + protocol + " , user " + userId);
             return agentAdaptorOp.get();
         } else {
             synchronized (this) {
-                agentAdaptorOp = agentStore.getStorageAdaptor(storageResourceId, protocol, authToken);
+                agentAdaptorOp = agentStore.getStorageAdaptor(storageResourceId, protocol, authToken, userId);
                 if (agentAdaptorOp.isPresent()) {
                     return agentAdaptorOp.get();
                 } else {
+                    logger.debug("Could not find a storage adaptor for gateway " + gatewayId + ", storage resource " +
+                            storageResourceId + ", protocol " + protocol + " , user " + userId + ". Creating new one");
                     switch (protocol) {
                         case SCP:
                             SSHJStorageAdaptor storageResourceAdaptor = new SSHJStorageAdaptor();
                             storageResourceAdaptor.init(storageResourceId, gatewayId, userId, authToken);
-                            agentStore.putStorageAdaptor(storageResourceId, protocol, authToken, storageResourceAdaptor);
+                            agentStore.putStorageAdaptor(storageResourceId, protocol, authToken, userId, storageResourceAdaptor);
                             return storageResourceAdaptor;
                         default:
                             throw new AgentException("Could not find an storage adaptor for gateway " + gatewayId +
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/adaptor/AgentStore.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/adaptor/AgentStore.java
index cd79cf0..b66cbe6 100644
--- a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/adaptor/AgentStore.java
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/adaptor/AgentStore.java
@@ -23,6 +23,7 @@ import org.apache.airavata.agents.api.AgentAdaptor;
 import org.apache.airavata.agents.api.StorageResourceAdaptor;
 import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
 import org.apache.airavata.model.data.movement.DataMovementProtocol;
+import org.apache.zookeeper.Op;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -36,16 +37,21 @@ import java.util.Optional;
  */
 public class AgentStore {
 
-    // compute resource: Job submission protocol: auth token: adaptor
-    private final Map<String, Map<JobSubmissionProtocol, Map<String, AgentAdaptor>>> agentAdaptorCache = new HashMap<>();
-    private final Map<String, Map<DataMovementProtocol, Map<String, StorageResourceAdaptor>>> storageAdaptorCache = new HashMap<>();
+    // compute resource: Job submission protocol: auth token: user: adaptor
+    private final Map<String, Map<JobSubmissionProtocol, Map<String, Map<String, AgentAdaptor>>>> agentAdaptorCache = new HashMap<>();
+    private final Map<String, Map<DataMovementProtocol, Map<String, Map<String, StorageResourceAdaptor>>>> storageAdaptorCache = new HashMap<>();
 
-    public Optional<AgentAdaptor> getAgentAdaptor(String computeResource, JobSubmissionProtocol submissionProtocol, String authToken) {
-        Map<JobSubmissionProtocol, Map<String, AgentAdaptor>> protoToTokenMap = agentAdaptorCache.get(computeResource);
+    public Optional<AgentAdaptor> getAgentAdaptor(String computeResource, JobSubmissionProtocol submissionProtocol, String authToken, String userId) {
+        Map<JobSubmissionProtocol, Map<String, Map<String, AgentAdaptor>>> protoToTokenMap = agentAdaptorCache.get(computeResource);
         if (protoToTokenMap != null) {
-            Map<String, AgentAdaptor> tokenToAdaptorMap = protoToTokenMap.get(submissionProtocol);
-            if (tokenToAdaptorMap != null) {
-                return Optional.ofNullable(tokenToAdaptorMap.get(authToken));
+            Map<String, Map<String,AgentAdaptor>> tokenToUserMap = protoToTokenMap.get(submissionProtocol);
+            if (tokenToUserMap != null) {
+                Map<String, AgentAdaptor> userToAdaptorMap = tokenToUserMap.get(authToken);
+                if (userToAdaptorMap != null) {
+                    return Optional.ofNullable(userToAdaptorMap.get(userId));
+                } else {
+                    return Optional.empty();
+                }
             } else {
                 return Optional.empty();
             }
@@ -54,18 +60,24 @@ public class AgentStore {
         }
     }
 
-    public void putAgentAdaptor(String computeResource, JobSubmissionProtocol submissionProtocol, String authToken, AgentAdaptor agentAdaptor) {
-        Map<JobSubmissionProtocol, Map<String, AgentAdaptor>> protoToTokenMap = agentAdaptorCache.computeIfAbsent(computeResource, k -> new HashMap<>());
-        Map<String, AgentAdaptor> tokenToAdaptorMap = protoToTokenMap.computeIfAbsent(submissionProtocol, k -> new HashMap<>());
-        tokenToAdaptorMap.put(authToken, agentAdaptor);
+    public void putAgentAdaptor(String computeResource, JobSubmissionProtocol submissionProtocol, String authToken, String userId, AgentAdaptor agentAdaptor) {
+        Map<JobSubmissionProtocol, Map<String, Map<String,AgentAdaptor>>> protoToTokenMap = agentAdaptorCache.computeIfAbsent(computeResource, k -> new HashMap<>());
+        Map<String, Map<String,AgentAdaptor>> tokenToUserMap = protoToTokenMap.computeIfAbsent(submissionProtocol, k -> new HashMap<>());
+        Map<String, AgentAdaptor> userToAdaptorMap = tokenToUserMap.computeIfAbsent(authToken, k-> new HashMap<>());
+        userToAdaptorMap.put(userId, agentAdaptor);
     }
 
-    public Optional<StorageResourceAdaptor> getStorageAdaptor(String computeResource, DataMovementProtocol dataMovementProtocol, String authToken) {
-        Map<DataMovementProtocol, Map<String, StorageResourceAdaptor>> protoToTokenMap = storageAdaptorCache.get(computeResource);
+    public Optional<StorageResourceAdaptor> getStorageAdaptor(String computeResource, DataMovementProtocol dataMovementProtocol, String authToken, String userId) {
+        Map<DataMovementProtocol, Map<String, Map<String, StorageResourceAdaptor>>> protoToTokenMap = storageAdaptorCache.get(computeResource);
         if (protoToTokenMap != null) {
-            Map<String, StorageResourceAdaptor> tokenToAdaptorMap = protoToTokenMap.get(dataMovementProtocol);
-            if (tokenToAdaptorMap != null) {
-                return Optional.ofNullable(tokenToAdaptorMap.get(authToken));
+            Map<String, Map<String, StorageResourceAdaptor>> tokenToUserMap = protoToTokenMap.get(dataMovementProtocol);
+            if (tokenToUserMap != null) {
+                Map<String, StorageResourceAdaptor> userToAdaptorMap = tokenToUserMap.get(authToken);
+                if (userToAdaptorMap != null) {
+                    return Optional.ofNullable(userToAdaptorMap.get(userId));
+                } else {
+                    return Optional.empty();
+                }
             } else {
                 return Optional.empty();
             }
@@ -74,10 +86,11 @@ public class AgentStore {
         }
     }
 
-    public void putStorageAdaptor(String computeResource, DataMovementProtocol dataMovementProtocol, String authToken, StorageResourceAdaptor storageResourceAdaptor) {
-        Map<DataMovementProtocol, Map<String, StorageResourceAdaptor>> protoToTokenMap = storageAdaptorCache.computeIfAbsent(computeResource, k -> new HashMap<>());
-        Map<String, StorageResourceAdaptor> tokenToAdaptorMap = protoToTokenMap.computeIfAbsent(dataMovementProtocol, k -> new HashMap<>());
-        tokenToAdaptorMap.put(authToken, storageResourceAdaptor);
+    public void putStorageAdaptor(String computeResource, DataMovementProtocol dataMovementProtocol, String authToken, String userId, StorageResourceAdaptor storageResourceAdaptor) {
+        Map<DataMovementProtocol, Map<String, Map<String, StorageResourceAdaptor>>> protoToTokenMap = storageAdaptorCache.computeIfAbsent(computeResource, k -> new HashMap<>());
+        Map<String, Map<String, StorageResourceAdaptor>> tokenToUserMap = protoToTokenMap.computeIfAbsent(dataMovementProtocol, k -> new HashMap<>());
+        Map<String, StorageResourceAdaptor> userToAdaptorMap = tokenToUserMap.computeIfAbsent(authToken, k -> new HashMap<>());
+        userToAdaptorMap.put(userId, storageResourceAdaptor);
     }
 
 }