You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2019/01/31 21:36:13 UTC
[airavata] branch staging-temp updated: Adding user id to adaptor
cache
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch staging-temp
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/staging-temp by this push:
new 3709aaa Adding user id to adaptor cache
3709aaa is described below
commit 3709aaaef543c1c46d5951b8a7c02156e15b02a7
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Thu Jan 31 16:36:05 2019 -0500
Adding user id to adaptor cache
---
.../core/support/adaptor/AdaptorSupportImpl.java | 30 ++++++++----
.../helix/core/support/adaptor/AgentStore.java | 55 +++++++++++++---------
2 files changed, 55 insertions(+), 30 deletions(-)
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/adaptor/AdaptorSupportImpl.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/adaptor/AdaptorSupportImpl.java
index 105459f..be09630 100644
--- a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/adaptor/AdaptorSupportImpl.java
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/adaptor/AdaptorSupportImpl.java
@@ -25,6 +25,8 @@ import org.apache.airavata.helix.adaptor.SSHJStorageAdaptor;
import org.apache.airavata.helix.task.api.support.AdaptorSupport;
import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
import org.apache.airavata.model.data.movement.DataMovementProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Optional;
@@ -36,6 +38,8 @@ import java.util.Optional;
*/
public class AdaptorSupportImpl implements AdaptorSupport {
+ private final static Logger logger = LoggerFactory.getLogger(AdaptorSupportImpl.class);
+
private static AdaptorSupportImpl INSTANCE;
private final AgentStore agentStore = new AgentStore();
@@ -52,26 +56,30 @@ public class AdaptorSupportImpl implements AdaptorSupport {
public void initializeAdaptor() {
}
- public AgentAdaptor fetchAdaptor(String gatewayId, String computeResource, JobSubmissionProtocol protocol, String authToken, String userId) throws AgentException {
+ public AgentAdaptor fetchAdaptor(String gatewayId, String computeResourceId, JobSubmissionProtocol protocol, String authToken, String userId) throws AgentException {
- Optional<AgentAdaptor> agentAdaptorOp = agentStore.getAgentAdaptor(computeResource, protocol, authToken);
+ Optional<AgentAdaptor> agentAdaptorOp = agentStore.getAgentAdaptor(computeResourceId, protocol, authToken, userId);
if (agentAdaptorOp.isPresent()) {
+ logger.debug("Re using the adaptor for gateway " + gatewayId + ", compute resource " +
+ computeResourceId + ", protocol " + protocol + " , user " + userId);
return agentAdaptorOp.get();
} else {
synchronized (this) {
- agentAdaptorOp = agentStore.getAgentAdaptor(computeResource, protocol, authToken);
+ agentAdaptorOp = agentStore.getAgentAdaptor(computeResourceId, protocol, authToken, userId);
if (agentAdaptorOp.isPresent()) {
return agentAdaptorOp.get();
} else {
+ logger.debug("Could not find an adaptor for gateway " + gatewayId + ", compute resource " +
+ computeResourceId + ", protocol " + protocol + " , user " + userId + ". Creating new one");
switch (protocol) {
case SSH:
SSHJAgentAdaptor agentAdaptor = new SSHJAgentAdaptor();
- agentAdaptor.init(computeResource, gatewayId, userId, authToken);
- agentStore.putAgentAdaptor(computeResource, protocol, authToken, agentAdaptor);
+ agentAdaptor.init(computeResourceId, gatewayId, userId, authToken);
+ agentStore.putAgentAdaptor(computeResourceId, protocol, authToken, userId, agentAdaptor);
return agentAdaptor;
default:
throw new AgentException("Could not find an agent adaptor for gateway " + gatewayId +
- ", compute resource " + computeResource + ", protocol " + protocol + " , user " + userId);
+ ", compute resource " + computeResourceId + ", protocol " + protocol + " , user " + userId);
}
}
}
@@ -80,20 +88,24 @@ public class AdaptorSupportImpl implements AdaptorSupport {
@Override
public StorageResourceAdaptor fetchStorageAdaptor(String gatewayId, String storageResourceId, DataMovementProtocol protocol, String authToken, String userId) throws AgentException {
- Optional<StorageResourceAdaptor> agentAdaptorOp = agentStore.getStorageAdaptor(storageResourceId, protocol, authToken);
+ Optional<StorageResourceAdaptor> agentAdaptorOp = agentStore.getStorageAdaptor(storageResourceId, protocol, authToken, userId);
if (agentAdaptorOp.isPresent()) {
+ logger.debug("Re using the storage adaptor for gateway " + gatewayId + ", storage resource " +
+ storageResourceId + ", protocol " + protocol + " , user " + userId);
return agentAdaptorOp.get();
} else {
synchronized (this) {
- agentAdaptorOp = agentStore.getStorageAdaptor(storageResourceId, protocol, authToken);
+ agentAdaptorOp = agentStore.getStorageAdaptor(storageResourceId, protocol, authToken, userId);
if (agentAdaptorOp.isPresent()) {
return agentAdaptorOp.get();
} else {
+ logger.debug("Could not find a storage adaptor for gateway " + gatewayId + ", storage resource " +
+ storageResourceId + ", protocol " + protocol + " , user " + userId + ". Creating new one");
switch (protocol) {
case SCP:
SSHJStorageAdaptor storageResourceAdaptor = new SSHJStorageAdaptor();
storageResourceAdaptor.init(storageResourceId, gatewayId, userId, authToken);
- agentStore.putStorageAdaptor(storageResourceId, protocol, authToken, storageResourceAdaptor);
+ agentStore.putStorageAdaptor(storageResourceId, protocol, authToken, userId, storageResourceAdaptor);
return storageResourceAdaptor;
default:
throw new AgentException("Could not find an storage adaptor for gateway " + gatewayId +
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/adaptor/AgentStore.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/adaptor/AgentStore.java
index cd79cf0..b66cbe6 100644
--- a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/adaptor/AgentStore.java
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/adaptor/AgentStore.java
@@ -23,6 +23,7 @@ import org.apache.airavata.agents.api.AgentAdaptor;
import org.apache.airavata.agents.api.StorageResourceAdaptor;
import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
import org.apache.airavata.model.data.movement.DataMovementProtocol;
+import org.apache.zookeeper.Op;
import java.util.HashMap;
import java.util.Map;
@@ -36,16 +37,21 @@ import java.util.Optional;
*/
public class AgentStore {
- // compute resource: Job submission protocol: auth token: adaptor
- private final Map<String, Map<JobSubmissionProtocol, Map<String, AgentAdaptor>>> agentAdaptorCache = new HashMap<>();
- private final Map<String, Map<DataMovementProtocol, Map<String, StorageResourceAdaptor>>> storageAdaptorCache = new HashMap<>();
+ // compute resource: Job submission protocol: auth token: user: adaptor
+ private final Map<String, Map<JobSubmissionProtocol, Map<String, Map<String, AgentAdaptor>>>> agentAdaptorCache = new HashMap<>();
+ private final Map<String, Map<DataMovementProtocol, Map<String, Map<String, StorageResourceAdaptor>>>> storageAdaptorCache = new HashMap<>();
- public Optional<AgentAdaptor> getAgentAdaptor(String computeResource, JobSubmissionProtocol submissionProtocol, String authToken) {
- Map<JobSubmissionProtocol, Map<String, AgentAdaptor>> protoToTokenMap = agentAdaptorCache.get(computeResource);
+ public Optional<AgentAdaptor> getAgentAdaptor(String computeResource, JobSubmissionProtocol submissionProtocol, String authToken, String userId) {
+ Map<JobSubmissionProtocol, Map<String, Map<String, AgentAdaptor>>> protoToTokenMap = agentAdaptorCache.get(computeResource);
if (protoToTokenMap != null) {
- Map<String, AgentAdaptor> tokenToAdaptorMap = protoToTokenMap.get(submissionProtocol);
- if (tokenToAdaptorMap != null) {
- return Optional.ofNullable(tokenToAdaptorMap.get(authToken));
+ Map<String, Map<String,AgentAdaptor>> tokenToUserMap = protoToTokenMap.get(submissionProtocol);
+ if (tokenToUserMap != null) {
+ Map<String, AgentAdaptor> userToAdaptorMap = tokenToUserMap.get(authToken);
+ if (userToAdaptorMap != null) {
+ return Optional.ofNullable(userToAdaptorMap.get(userId));
+ } else {
+ return Optional.empty();
+ }
} else {
return Optional.empty();
}
@@ -54,18 +60,24 @@ public class AgentStore {
}
}
- public void putAgentAdaptor(String computeResource, JobSubmissionProtocol submissionProtocol, String authToken, AgentAdaptor agentAdaptor) {
- Map<JobSubmissionProtocol, Map<String, AgentAdaptor>> protoToTokenMap = agentAdaptorCache.computeIfAbsent(computeResource, k -> new HashMap<>());
- Map<String, AgentAdaptor> tokenToAdaptorMap = protoToTokenMap.computeIfAbsent(submissionProtocol, k -> new HashMap<>());
- tokenToAdaptorMap.put(authToken, agentAdaptor);
+ public void putAgentAdaptor(String computeResource, JobSubmissionProtocol submissionProtocol, String authToken, String userId, AgentAdaptor agentAdaptor) {
+ Map<JobSubmissionProtocol, Map<String, Map<String,AgentAdaptor>>> protoToTokenMap = agentAdaptorCache.computeIfAbsent(computeResource, k -> new HashMap<>());
+ Map<String, Map<String,AgentAdaptor>> tokenToUserMap = protoToTokenMap.computeIfAbsent(submissionProtocol, k -> new HashMap<>());
+ Map<String, AgentAdaptor> userToAdaptorMap = tokenToUserMap.computeIfAbsent(authToken, k-> new HashMap<>());
+ userToAdaptorMap.put(userId, agentAdaptor);
}
- public Optional<StorageResourceAdaptor> getStorageAdaptor(String computeResource, DataMovementProtocol dataMovementProtocol, String authToken) {
- Map<DataMovementProtocol, Map<String, StorageResourceAdaptor>> protoToTokenMap = storageAdaptorCache.get(computeResource);
+ public Optional<StorageResourceAdaptor> getStorageAdaptor(String computeResource, DataMovementProtocol dataMovementProtocol, String authToken, String userId) {
+ Map<DataMovementProtocol, Map<String, Map<String, StorageResourceAdaptor>>> protoToTokenMap = storageAdaptorCache.get(computeResource);
if (protoToTokenMap != null) {
- Map<String, StorageResourceAdaptor> tokenToAdaptorMap = protoToTokenMap.get(dataMovementProtocol);
- if (tokenToAdaptorMap != null) {
- return Optional.ofNullable(tokenToAdaptorMap.get(authToken));
+ Map<String, Map<String, StorageResourceAdaptor>> tokenToUserMap = protoToTokenMap.get(dataMovementProtocol);
+ if (tokenToUserMap != null) {
+ Map<String, StorageResourceAdaptor> userToAdaptorMap = tokenToUserMap.get(authToken);
+ if (userToAdaptorMap != null) {
+ return Optional.ofNullable(userToAdaptorMap.get(userId));
+ } else {
+ return Optional.empty();
+ }
} else {
return Optional.empty();
}
@@ -74,10 +86,11 @@ public class AgentStore {
}
}
- public void putStorageAdaptor(String computeResource, DataMovementProtocol dataMovementProtocol, String authToken, StorageResourceAdaptor storageResourceAdaptor) {
- Map<DataMovementProtocol, Map<String, StorageResourceAdaptor>> protoToTokenMap = storageAdaptorCache.computeIfAbsent(computeResource, k -> new HashMap<>());
- Map<String, StorageResourceAdaptor> tokenToAdaptorMap = protoToTokenMap.computeIfAbsent(dataMovementProtocol, k -> new HashMap<>());
- tokenToAdaptorMap.put(authToken, storageResourceAdaptor);
+ public void putStorageAdaptor(String computeResource, DataMovementProtocol dataMovementProtocol, String authToken, String userId, StorageResourceAdaptor storageResourceAdaptor) {
+ Map<DataMovementProtocol, Map<String, Map<String, StorageResourceAdaptor>>> protoToTokenMap = storageAdaptorCache.computeIfAbsent(computeResource, k -> new HashMap<>());
+ Map<String, Map<String, StorageResourceAdaptor>> tokenToUserMap = protoToTokenMap.computeIfAbsent(dataMovementProtocol, k -> new HashMap<>());
+ Map<String, StorageResourceAdaptor> userToAdaptorMap = tokenToUserMap.computeIfAbsent(authToken, k -> new HashMap<>());
+ userToAdaptorMap.put(userId, storageResourceAdaptor);
}
}