You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by is...@apache.org on 2021/07/11 16:18:50 UTC
[airavata-data-lake] branch master updated: Bug fix drms
This is an automated email from the ASF dual-hosted git repository.
isjarana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git
The following commit(s) were added to refs/heads/master by this push:
new caa27c4 Bug fix drms
new 16b56bb Merge pull request #24 from isururanawaka/workflow_invocation
caa27c4 is described below
commit caa27c47f8ec8cba905f09926ac6f113fe0669e5
Author: Isuru Ranawaka <ir...@gmail.com>
AuthorDate: Sun Jul 11 12:17:49 2021 -0400
Bug fix drms
---
.../orchestrator/connectors/DRMSConnector.java | 121 ++++++++++-----------
.../processor/OutboundEventProcessor.java | 28 ++++-
.../drms-rest-proxy/src/main/resources/drms.pb | Bin 109344 -> 109314 bytes
.../src/main/proto/storage/StorageService.proto | 6 +-
4 files changed, 83 insertions(+), 72 deletions(-)
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
index f87542e..4f9cbab 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
@@ -28,7 +28,7 @@ public class DRMSConnector implements AbstractConnector<Configuration> {
private ManagedChannel drmsChannel;
private ResourceServiceGrpc.ResourceServiceBlockingStub resourceServiceBlockingStub;
- private StoragePreferenceServiceGrpc.StoragePreferenceServiceBlockingStub storagePreferenceServiceBlockingStub;
+ private StorageServiceGrpc.StorageServiceBlockingStub storageServiceBlockingStub;
public DRMSConnector(Configuration configuration) throws Exception {
this.init(configuration);
@@ -40,7 +40,7 @@ public class DRMSConnector implements AbstractConnector<Configuration> {
.forAddress(configuration.getOutboundEventProcessor().getDrmsHost(),
configuration.getOutboundEventProcessor().getDrmsPort()).usePlaintext().build();
this.resourceServiceBlockingStub = ResourceServiceGrpc.newBlockingStub(drmsChannel);
- this.storagePreferenceServiceBlockingStub = StoragePreferenceServiceGrpc.newBlockingStub(drmsChannel);
+ this.storageServiceBlockingStub = StorageServiceGrpc.newBlockingStub(drmsChannel);
}
@@ -54,67 +54,62 @@ public class DRMSConnector implements AbstractConnector<Configuration> {
return !this.drmsChannel.isShutdown();
}
-// public Optional<String> getSourceStoragePreferenceId(DataOrchestratorEntity entity, String hostname) {
-// DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
-// .setAccessToken(entity.getAuthToken())
-// .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
-// .setAuthenticatedUser(AuthenticatedUser.newBuilder()
-// .setUsername(entity.getOwnerId())
-// .setTenantId(entity.getTenantId())
-// .build())
-// .build();
-// FindTransferMappingsRequest request = FindTransferMappingsRequest.newBuilder()
-// .setAuthToken(serviceAuthToken)
-// .build();
-// FindTransferMappingsResponse response = storagePreferenceServiceBlockingStub.getTransferMappings(request);
-// List<TransferMapping> transferMappingList = response.getMappingsList();
-// AtomicReference<String> storagePreferenceId = new AtomicReference<>(null);
-// if (!transferMappingList.isEmpty()) {
-// transferMappingList.forEach(transferMapping -> {
-// if (transferMapping.getSourceStoragePreference().getStorageCase()
-// .equals(AnyStoragePreference.StorageCase.SSH_STORAGE_PREFERENCE)) {
-// if (transferMapping.getSourceStoragePreference().getSshStoragePreference()
-// .getStorage().getHostName().equals(hostname)) {
-// storagePreferenceId
-// .set(transferMapping.getSourceStoragePreference()
-// .getSshStoragePreference().getStoragePreferenceId());
-// }
-// }
-// });
-// }
-// return Optional.ofNullable(storagePreferenceId.get());
-// }
-//
-// public Optional<String> getDestinationStoragePreferenceId(DataOrchestratorEntity entity, String hostname) {
-// DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
-// .setAccessToken(entity.getAuthToken())
-// .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
-// .setAuthenticatedUser(AuthenticatedUser.newBuilder()
-// .setUsername(entity.getOwnerId())
-// .setTenantId(entity.getTenantId())
-// .build())
-// .build();
-// FindTransferMappingsRequest request = FindTransferMappingsRequest.newBuilder()
-// .setAuthToken(serviceAuthToken)
-// .build();
-// FindTransferMappingsResponse response = storagePreferenceServiceBlockingStub.getTransferMappings(request);
-// List<TransferMapping> transferMappingList = response.getMappingsList();
-// AtomicReference<String> storagePreferenceId = new AtomicReference<>(null);
-// if (!transferMappingList.isEmpty()) {
-// transferMappingList.forEach(transferMapping -> {
-// if (transferMapping.getDestinationStoragePreference().getStorageCase()
-// .equals(AnyStoragePreference.StorageCase.SSH_STORAGE_PREFERENCE)) {
-// if (transferMapping.getDestinationStoragePreference().getSshStoragePreference()
-// .getStorage().getHostName().equals(hostname)) {
-// storagePreferenceId
-// .set(transferMapping.getDestinationStoragePreference()
-// .getSshStoragePreference().getStoragePreferenceId());
-// }
-// }
-// });
-// }
-// return Optional.ofNullable(storagePreferenceId.get());
-// }
+ public Optional<TransferMapping> getActiveTransferMapping(DataOrchestratorEntity entity, String hostname) {
+ DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
+ .setAccessToken(entity.getAuthToken())
+ .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
+ .setAuthenticatedUser(AuthenticatedUser.newBuilder()
+ .setUsername(entity.getOwnerId())
+ .setTenantId(entity.getTenantId())
+ .build())
+ .build();
+ FindTransferMappingsRequest request = FindTransferMappingsRequest.newBuilder()
+ .setAuthToken(serviceAuthToken)
+ .build();
+ FindTransferMappingsResponse response = storageServiceBlockingStub.getTransferMappings(request);
+ List<TransferMapping> transferMappingList = response.getMappingsList();
+ AtomicReference<TransferMapping> transferMappingOp = new AtomicReference<>(null);
+ if (!transferMappingList.isEmpty()) {
+ transferMappingList.forEach(transferMapping -> {
+ if (transferMapping.getSourceStorage().getStorageCase()
+ .equals(AnyStoragePreference.StorageCase.SSH_STORAGE_PREFERENCE)) {
+ if (transferMapping.getSourceStorage().getSshStorage().getHostName().equals(hostname)) {
+ transferMappingOp.set(transferMapping);
+ }
+ }
+ });
+ }
+ return Optional.ofNullable(transferMappingOp.get());
+ }
+
+ public Optional<String> getDestinationStorageId(DataOrchestratorEntity entity, String hostname) {
+ DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
+ .setAccessToken(entity.getAuthToken())
+ .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
+ .setAuthenticatedUser(AuthenticatedUser.newBuilder()
+ .setUsername(entity.getOwnerId())
+ .setTenantId(entity.getTenantId())
+ .build())
+ .build();
+ FindTransferMappingsRequest request = FindTransferMappingsRequest.newBuilder()
+ .setAuthToken(serviceAuthToken)
+ .build();
+ FindTransferMappingsResponse response = storageServiceBlockingStub.getTransferMappings(request);
+ List<TransferMapping> transferMappingList = response.getMappingsList();
+ AtomicReference<String> storagePreferenceId = new AtomicReference<>(null);
+ if (!transferMappingList.isEmpty()) {
+ transferMappingList.forEach(transferMapping -> {
+ if (transferMapping.getDestinationStorage().getStorageCase()
+ .equals(AnyStoragePreference.StorageCase.SSH_STORAGE_PREFERENCE)) {
+ if (transferMapping.getDestinationStorage().getSshStorage().getHostName().equals(hostname)) {
+ storagePreferenceId
+ .set(transferMapping.getDestinationStorage().getSshStorage().getStorageId());
+ }
+ }
+ });
+ }
+ return Optional.ofNullable(storagePreferenceId.get());
+ }
public Optional<GenericResource> createResource(DataOrchestratorEventRepository repository, DataOrchestratorEntity entity,
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
index 013ee59..522d8a7 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
@@ -1,6 +1,7 @@
package org.apache.airavata.datalake.orchestrator.processor;
import org.apache.airavata.datalake.drms.resource.GenericResource;
+import org.apache.airavata.datalake.drms.storage.TransferMapping;
import org.apache.airavata.datalake.orchestrator.Configuration;
import org.apache.airavata.datalake.orchestrator.Utils;
import org.apache.airavata.datalake.orchestrator.connectors.DRMSConnector;
@@ -89,27 +90,28 @@ public class OutboundEventProcessor implements MessageProcessor<Configuration> {
String tail = resourcePath.substring(resourcePath.indexOf(ownerId));
String[] collections = tail.split("/");
-// Optional<String> optionalStorPref = drmsConnector.getSourceStoragePreferenceId(entity, entity.getHostName());
- Optional<String> optionalStorPref = null;
+ Optional<TransferMapping> optionalStorPref = drmsConnector.getActiveTransferMapping(entity, entity.getHostName());
if (optionalStorPref.isEmpty()) {
entity.setEventStatus(EventStatus.ERRORED.name());
- entity.setError("StoragePreference not found for host: " + entity.getHostName());
+ entity.setError("Storage not found for host: " + entity.getHostName());
repository.save(entity);
return;
}
- String parentId = optionalStorPref.get();
+ TransferMapping transferMapping = optionalStorPref.get();
+ String sourceStorageId = transferMapping.getSourceStorage().getSshStorage().getStorageId();
+ String destinationStorageId = transferMapping.getDestinationStorage().getSshStorage().getStorageId();
+ String parentId = sourceStorageId;
for (int i = 1; i < collections.length - 1; i++) {
String resourceName = collections[i];
String path = entity.getResourcePath().substring(0, entity.getResourcePath().indexOf(resourceName));
path = path.concat(resourceName);
String entityId = Utils.getId(path);
Optional<GenericResource> optionalGenericResource =
- this.drmsConnector.createResource(repository, entity, entityId, resourceName, path, parentId, "COLLECTION");
+ this.drmsConnector.createResource(repository, entity, entityId, resourceName, path, sourceStorageId, "COLLECTION");
if (optionalGenericResource.isPresent()) {
parentId = optionalGenericResource.get().getResourceId();
-
} else {
entity.setEventStatus(EventStatus.ERRORED.name());
entity.setError("Collection structure creation failed: " + entity.getHostName());
@@ -123,6 +125,20 @@ public class OutboundEventProcessor implements MessageProcessor<Configuration> {
collections[collections.length - 1], entity.getResourcePath(),
parentId, "FILE");
+ String dstResourceHost = transferMapping.getDestinationStorage().getSshStorage().getHostName();
+ String destinationResourceId = dstResourceHost+":"+ entity.getResourcePath() + ":" + entity.getResourceType();
+ String messageId = Utils.getId(destinationResourceId);
+
+ Optional<GenericResource> destinationFile = this.drmsConnector.createResource(repository, entity, messageId,
+ entity.getResourceName(),
+ entity.getResourcePath(),
+ destinationStorageId,
+ "FILE");
+
+ Optional<GenericResource> optionalGenericResourceDST =
+ this.drmsConnector.createResource(repository, entity, messageId,
+ collections[collections.length - 1], entity.getResourcePath(),
+ destinationStorageId, "FILE");
if (optionalGenericResource.isPresent()) {
this.workflowServiceConnector.invokeWorkflow(repository, entity, optionalGenericResource.get());
diff --git a/data-resource-management-service/drms-rest-proxy/src/main/resources/drms.pb b/data-resource-management-service/drms-rest-proxy/src/main/resources/drms.pb
index 0227833..1b745c1 100644
Binary files a/data-resource-management-service/drms-rest-proxy/src/main/resources/drms.pb and b/data-resource-management-service/drms-rest-proxy/src/main/resources/drms.pb differ
diff --git a/data-resource-management-service/drms-stubs/src/main/proto/storage/StorageService.proto b/data-resource-management-service/drms-stubs/src/main/proto/storage/StorageService.proto
index 3f5b3a2..bd89026 100644
--- a/data-resource-management-service/drms-stubs/src/main/proto/storage/StorageService.proto
+++ b/data-resource-management-service/drms-stubs/src/main/proto/storage/StorageService.proto
@@ -167,19 +167,19 @@ service StorageService {
rpc createTransferMapping (CreateTransferMappingRequest) returns (CreateTransferMappingResponse) {
option (google.api.http) = {
- post: "/v1.0/api/drms/storagePreference/transferMapping"
+ post: "/v1.0/api/drms/storage/transferMapping"
};
}
rpc getTransferMappings (FindTransferMappingsRequest) returns (FindTransferMappingsResponse) {
option (google.api.http) = {
- get: "/v1.0/api/drms/storagePreference/transferMapping"
+ get: "/v1.0/api/drms/storage/transferMapping"
};
}
rpc deleteTransferMappings (DeleteTransferMappingRequest) returns (google.protobuf.Empty) {
option (google.api.http) = {
- delete: "/v1.0/api/drms/storagePreference/transferMapping"
+ delete: "/v1.0/api/drms/storage/transferMapping"
};
}
}
\ No newline at end of file