You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by is...@apache.org on 2021/07/05 20:00:37 UTC

[airavata-data-lake] branch master updated: Bug fix in ResourceServiceHandler DRMS and DataOrch Outboundevent Prcoessor

This is an automated email from the ASF dual-hosted git repository.

isjarana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git


The following commit(s) were added to refs/heads/master by this push:
     new 9cf920e  Bug fix in ResourceServiceHandler DRMS and DataOrch Outboundevent Prcoessor
     new d6a16ed  Merge pull request #14 from isururanawaka/workflow_merge
9cf920e is described below

commit 9cf920e5904cd376e7d94b633a3ce1193117802f
Author: Isuru Ranawaka <ir...@gmail.com>
AuthorDate: Mon Jul 5 15:57:41 2021 -0400

    Bug fix in ResourceServiceHandler DRMS and DataOrch Outboundevent Prcoessor
---
 .../registry/persistance/EventStatus.java          |  2 +-
 .../handlers/OrchestratorEventHandler.java         |  6 ---
 .../processor/InboundEventProcessor.java           |  3 --
 .../processor/OutboundEventProcessor.java          | 57 ++++++++++++++++------
 .../src/main/proto/service/WorkflowService.proto   |  2 +-
 .../drms/api/handlers/ResourceServiceHandler.java  |  2 +-
 .../airavata/drms/api/utils/CustosUtils.java       |  7 +--
 7 files changed, 47 insertions(+), 32 deletions(-)

diff --git a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/EventStatus.java b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/EventStatus.java
index c58c452..24bafbe 100644
--- a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/EventStatus.java
+++ b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/EventStatus.java
@@ -2,7 +2,7 @@ package org.apache.airavata.datalake.orchestrator.registry.persistance;
 
 public enum EventStatus {
     DATA_ORCH_RECEIVED,
-    WORKFLOW_LAUNCHED,
+    DISPATCHED_TO_WORFLOW_ENGING,
     DATA_ORCH_PROCESSED_AND_SKIPPED,
     MFT_CALLBACK_RECEIVED,
     COMPLETED,
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/OrchestratorEventHandler.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/OrchestratorEventHandler.java
index 30c8679..2c5600e 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/OrchestratorEventHandler.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/OrchestratorEventHandler.java
@@ -1,7 +1,6 @@
 package org.apache.airavata.datalake.orchestrator.handlers;
 
 import org.apache.airavata.datalake.orchestrator.Configuration;
-import org.apache.airavata.datalake.orchestrator.core.processor.MessageProcessor;
 import org.apache.airavata.datalake.orchestrator.processor.InboundEventProcessor;
 import org.apache.airavata.datalake.orchestrator.processor.OutboundEventProcessor;
 import org.apache.airavata.datalake.orchestrator.registry.persistance.DataOrchestratorEventRepository;
@@ -27,10 +26,7 @@ public class OrchestratorEventHandler {
 
     private ExecutorService executorService;
     private ScheduledExecutorService ouboundExecutorService;
-    private MessageProcessor messageProcessor;
     private MessageConsumer messageConsumer;
-    private OutboundEventProcessor outboundEventProcessor;
-
 
     @Autowired
     private DataOrchestratorEventRepository dataOrchestratorEventRepository;
@@ -47,7 +43,6 @@ public class OrchestratorEventHandler {
                 configuration.getConsumer().getConsumerGroup(),
                 configuration.getConsumer().getMaxPollRecordsConfig(),
                 configuration.getConsumer().getTopic());
-        this.outboundEventProcessor = new OutboundEventProcessor(configuration, dataOrchestratorEventRepository);
 
     }
 
@@ -62,7 +57,6 @@ public class OrchestratorEventHandler {
         this.ouboundExecutorService
                 .scheduleAtFixedRate(new OutboundEventProcessor(configuration, dataOrchestratorEventRepository),
                         0, configuration.getOutboundEventProcessor().getPollingInterval(), TimeUnit.SECONDS);
-
     }
 
     public Configuration getConfiguration() {
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/InboundEventProcessor.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/InboundEventProcessor.java
index dd46a7f..92fc28b 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/InboundEventProcessor.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/InboundEventProcessor.java
@@ -1,7 +1,6 @@
 package org.apache.airavata.datalake.orchestrator.processor;
 
 import org.apache.airavata.datalake.orchestrator.Configuration;
-import org.apache.airavata.datalake.orchestrator.core.adaptors.StorageAdaptor;
 import org.apache.airavata.datalake.orchestrator.core.processor.MessageProcessor;
 import org.apache.airavata.datalake.orchestrator.registry.persistance.DataOrchestratorEntity;
 import org.apache.airavata.datalake.orchestrator.registry.persistance.DataOrchestratorEventRepository;
@@ -27,8 +26,6 @@ import java.util.regex.Pattern;
  */
 public class InboundEventProcessor implements MessageProcessor {
     private static final Logger LOGGER = LoggerFactory.getLogger(InboundEventProcessor.class);
-    private StorageAdaptor store;
-
     private Configuration configuration;
     private NotificationEvent notificationEvent;
     private DozerBeanMapper dozerBeanMapper;
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
index a5626ee..7c9e7cc 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
@@ -45,6 +45,7 @@ public class OutboundEventProcessor implements MessageProcessor {
 
     public OutboundEventProcessor(Configuration configuration, DataOrchestratorEventRepository repository) throws Exception {
         this.repository = repository;
+        //convert these to SSL
         this.workflowChannel = ManagedChannelBuilder
                 .forAddress(configuration.getOutboundEventProcessor().getWorkflowEngineHost(),
                         configuration.getOutboundEventProcessor().getWorkflowPort()).usePlaintext().build();
@@ -89,8 +90,6 @@ public class OutboundEventProcessor implements MessageProcessor {
             entityMap.forEach((key, value) -> {
                 DataOrchestratorEntity entity = value.remove(0);
                 processEvent(entity);
-                entity.setEventStatus(EventStatus.WORKFLOW_LAUNCHED.name());
-                repository.save(entity);
                 value.forEach(val -> {
                     val.setEventStatus(EventStatus.DATA_ORCH_PROCESSED_AND_SKIPPED.name());
                     repository.save(val);
@@ -104,7 +103,6 @@ public class OutboundEventProcessor implements MessageProcessor {
 
     private void processEvent(DataOrchestratorEntity entity) {
         try {
-
             DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
                     .setAccessToken(entity.getAuthToken())
                     .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
@@ -141,21 +139,52 @@ public class OutboundEventProcessor implements MessageProcessor {
                         .setAuthToken(serviceAuthToken)
                         .setResource(genericResource)
                         .build();
-                ResourceCreateResponse resourceCreateResponse = resourceServiceBlockingStub.createResource(resourceCreateRequest);
-                GenericResource resource = resourceCreateResponse.getResource();
-
-                WorkflowServiceAuthToken workflowServiceAuthToken = WorkflowServiceAuthToken.newBuilder().setAccessToken("").build();
-                WorkflowMessage workflowMessage = WorkflowMessage.newBuilder().setResourceId(resource.getResourceId()).build();
-
-                WorkflowInvocationRequest workflowInvocationRequest = WorkflowInvocationRequest
-                        .newBuilder().setMessage(workflowMessage).setAuthToken(workflowServiceAuthToken).build();
-                this.workflowServiceStub.invokeWorkflow(workflowInvocationRequest);
+                GenericResource resource = null;
+                try {
+                    ResourceCreateResponse resourceCreateResponse = resourceServiceBlockingStub.createResource(resourceCreateRequest);
+                    resource = resourceCreateResponse.getResource();
+                } catch (Exception ex) {
+                    LOGGER.error("Error occurred while creating resource {} in DRMS", entity.getResourceId(), ex);
+                    entity.setEventStatus(EventStatus.ERRORED.name());
+                    entity.setError("Error occurred while creating resource in DRMS " + ex.getMessage());
+                    repository.save(entity);
+                    return;
+                }
+
+                try {
+                    WorkflowServiceAuthToken workflowServiceAuthToken = WorkflowServiceAuthToken
+                            .newBuilder()
+                            .setAccessToken("")
+                            .build();
+                    WorkflowMessage workflowMessage = WorkflowMessage.newBuilder()
+                            .setResourceId(resource.getResourceId())
+                            .build();
+
+                    WorkflowInvocationRequest workflowInvocationRequest = WorkflowInvocationRequest
+                            .newBuilder().setMessage(workflowMessage).setAuthToken(workflowServiceAuthToken).build();
+                    this.workflowServiceStub.invokeWorkflow(workflowInvocationRequest);
+                } catch (Exception ex) {
+                    LOGGER.error("Error occurred while invoking workflow engine", entity.getResourceId(), ex);
+                    entity.setEventStatus(EventStatus.ERRORED.name());
+                    entity.setError("Error occurred while invoking workflow engine" + ex.getMessage());
+                    repository.save(entity);
+                    return;
+                }
+            } else {
+                LOGGER.error("Incorrect storage preference  {}", entity.getStoragePreferenceId());
+                entity.setEventStatus(EventStatus.ERRORED.name());
+                entity.setError("Incorrect storage preference " + entity.getStoragePreferenceId());
+                repository.save(entity);
+                return;
             }
+            entity.setEventStatus(EventStatus.DISPATCHED_TO_WORFLOW_ENGING.name());
+            repository.save(entity);
         } catch (Exception exception) {
             LOGGER.error("Error occurred while processing outbound data orcehstrator event", exception);
-            throw new RuntimeException(exception);
+            entity.setEventStatus(EventStatus.ERRORED.name());
+            entity.setError("Error occurred while processing ");
+            repository.save(entity);
         }
     }
 
-
 }
diff --git a/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto b/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto
index 64cecad..a3557b1 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto
+++ b/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto
@@ -30,7 +30,7 @@ message WorkflowMessage {
 }
 
 message WorkflowInvocationRequest {
-    org.apache.airavata.datalake.orchestrator.workflow.WorkflowServiceAuthToken authToken = 1;
+    org.apache.airavata.datalake.orchestrator.workflow.WorkflowServiceAuthToken auth_token = 1;
     WorkflowMessage message = 2;
 }
 
diff --git a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java
index d3ff585..ad345de 100644
--- a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java
+++ b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java
@@ -126,7 +126,7 @@ public class ResourceServiceHandler extends ResourceServiceGrpc.ResourceServiceI
             if (request.getResource().getStoragePreferenceCase()
                     .equals(GenericResource.StoragePreferenceCase.S3_PREFERENCE)) {
                 storagePreferenceId = request.getResource().getS3Preference().getStoragePreferenceId();
-            } else if (request.getResource().getStoragePreferenceCase().name()
+            } else if (request.getResource().getStoragePreferenceCase()
                     .equals(GenericResource.StoragePreferenceCase.SSH_PREFERENCE)) {
                 storagePreferenceId = request.getResource().getSshPreference().getStoragePreferenceId();
             }
diff --git a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/utils/CustosUtils.java b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/utils/CustosUtils.java
index 78256df..fee7583 100644
--- a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/utils/CustosUtils.java
+++ b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/utils/CustosUtils.java
@@ -102,12 +102,7 @@ public class CustosUtils {
         Status status = sharingManagementClient.isEntityExists(tenantId, entity);
         if (!status.getStatus()) {
             sharingManagementClient.createEntity(tenantId, entity);
-            return Optional.ofNullable(sharingManagementClient.getEntity(tenantId, entity));
         }
-
-        return Optional.empty();
-
+        return Optional.ofNullable(sharingManagementClient.getEntity(tenantId, entity));
     }
-
-
 }