You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by is...@apache.org on 2021/07/05 20:00:37 UTC
[airavata-data-lake] branch master updated: Bug fix in
ResourceServiceHandler DRMS and DataOrch Outboundevent Prcoessor
This is an automated email from the ASF dual-hosted git repository.
isjarana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git
The following commit(s) were added to refs/heads/master by this push:
new 9cf920e Bug fix in ResourceServiceHandler DRMS and DataOrch Outboundevent Prcoessor
new d6a16ed Merge pull request #14 from isururanawaka/workflow_merge
9cf920e is described below
commit 9cf920e5904cd376e7d94b633a3ce1193117802f
Author: Isuru Ranawaka <ir...@gmail.com>
AuthorDate: Mon Jul 5 15:57:41 2021 -0400
Bug fix in ResourceServiceHandler DRMS and DataOrch Outboundevent Prcoessor
---
.../registry/persistance/EventStatus.java | 2 +-
.../handlers/OrchestratorEventHandler.java | 6 ---
.../processor/InboundEventProcessor.java | 3 --
.../processor/OutboundEventProcessor.java | 57 ++++++++++++++++------
.../src/main/proto/service/WorkflowService.proto | 2 +-
.../drms/api/handlers/ResourceServiceHandler.java | 2 +-
.../airavata/drms/api/utils/CustosUtils.java | 7 +--
7 files changed, 47 insertions(+), 32 deletions(-)
diff --git a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/EventStatus.java b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/EventStatus.java
index c58c452..24bafbe 100644
--- a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/EventStatus.java
+++ b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/EventStatus.java
@@ -2,7 +2,7 @@ package org.apache.airavata.datalake.orchestrator.registry.persistance;
public enum EventStatus {
DATA_ORCH_RECEIVED,
- WORKFLOW_LAUNCHED,
+ DISPATCHED_TO_WORFLOW_ENGING,
DATA_ORCH_PROCESSED_AND_SKIPPED,
MFT_CALLBACK_RECEIVED,
COMPLETED,
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/OrchestratorEventHandler.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/OrchestratorEventHandler.java
index 30c8679..2c5600e 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/OrchestratorEventHandler.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/OrchestratorEventHandler.java
@@ -1,7 +1,6 @@
package org.apache.airavata.datalake.orchestrator.handlers;
import org.apache.airavata.datalake.orchestrator.Configuration;
-import org.apache.airavata.datalake.orchestrator.core.processor.MessageProcessor;
import org.apache.airavata.datalake.orchestrator.processor.InboundEventProcessor;
import org.apache.airavata.datalake.orchestrator.processor.OutboundEventProcessor;
import org.apache.airavata.datalake.orchestrator.registry.persistance.DataOrchestratorEventRepository;
@@ -27,10 +26,7 @@ public class OrchestratorEventHandler {
private ExecutorService executorService;
private ScheduledExecutorService ouboundExecutorService;
- private MessageProcessor messageProcessor;
private MessageConsumer messageConsumer;
- private OutboundEventProcessor outboundEventProcessor;
-
@Autowired
private DataOrchestratorEventRepository dataOrchestratorEventRepository;
@@ -47,7 +43,6 @@ public class OrchestratorEventHandler {
configuration.getConsumer().getConsumerGroup(),
configuration.getConsumer().getMaxPollRecordsConfig(),
configuration.getConsumer().getTopic());
- this.outboundEventProcessor = new OutboundEventProcessor(configuration, dataOrchestratorEventRepository);
}
@@ -62,7 +57,6 @@ public class OrchestratorEventHandler {
this.ouboundExecutorService
.scheduleAtFixedRate(new OutboundEventProcessor(configuration, dataOrchestratorEventRepository),
0, configuration.getOutboundEventProcessor().getPollingInterval(), TimeUnit.SECONDS);
-
}
public Configuration getConfiguration() {
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/InboundEventProcessor.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/InboundEventProcessor.java
index dd46a7f..92fc28b 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/InboundEventProcessor.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/InboundEventProcessor.java
@@ -1,7 +1,6 @@
package org.apache.airavata.datalake.orchestrator.processor;
import org.apache.airavata.datalake.orchestrator.Configuration;
-import org.apache.airavata.datalake.orchestrator.core.adaptors.StorageAdaptor;
import org.apache.airavata.datalake.orchestrator.core.processor.MessageProcessor;
import org.apache.airavata.datalake.orchestrator.registry.persistance.DataOrchestratorEntity;
import org.apache.airavata.datalake.orchestrator.registry.persistance.DataOrchestratorEventRepository;
@@ -27,8 +26,6 @@ import java.util.regex.Pattern;
*/
public class InboundEventProcessor implements MessageProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(InboundEventProcessor.class);
- private StorageAdaptor store;
-
private Configuration configuration;
private NotificationEvent notificationEvent;
private DozerBeanMapper dozerBeanMapper;
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
index a5626ee..7c9e7cc 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
@@ -45,6 +45,7 @@ public class OutboundEventProcessor implements MessageProcessor {
public OutboundEventProcessor(Configuration configuration, DataOrchestratorEventRepository repository) throws Exception {
this.repository = repository;
+ //convert these to SSL
this.workflowChannel = ManagedChannelBuilder
.forAddress(configuration.getOutboundEventProcessor().getWorkflowEngineHost(),
configuration.getOutboundEventProcessor().getWorkflowPort()).usePlaintext().build();
@@ -89,8 +90,6 @@ public class OutboundEventProcessor implements MessageProcessor {
entityMap.forEach((key, value) -> {
DataOrchestratorEntity entity = value.remove(0);
processEvent(entity);
- entity.setEventStatus(EventStatus.WORKFLOW_LAUNCHED.name());
- repository.save(entity);
value.forEach(val -> {
val.setEventStatus(EventStatus.DATA_ORCH_PROCESSED_AND_SKIPPED.name());
repository.save(val);
@@ -104,7 +103,6 @@ public class OutboundEventProcessor implements MessageProcessor {
private void processEvent(DataOrchestratorEntity entity) {
try {
-
DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
.setAccessToken(entity.getAuthToken())
.setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
@@ -141,21 +139,52 @@ public class OutboundEventProcessor implements MessageProcessor {
.setAuthToken(serviceAuthToken)
.setResource(genericResource)
.build();
- ResourceCreateResponse resourceCreateResponse = resourceServiceBlockingStub.createResource(resourceCreateRequest);
- GenericResource resource = resourceCreateResponse.getResource();
-
- WorkflowServiceAuthToken workflowServiceAuthToken = WorkflowServiceAuthToken.newBuilder().setAccessToken("").build();
- WorkflowMessage workflowMessage = WorkflowMessage.newBuilder().setResourceId(resource.getResourceId()).build();
-
- WorkflowInvocationRequest workflowInvocationRequest = WorkflowInvocationRequest
- .newBuilder().setMessage(workflowMessage).setAuthToken(workflowServiceAuthToken).build();
- this.workflowServiceStub.invokeWorkflow(workflowInvocationRequest);
+ GenericResource resource = null;
+ try {
+ ResourceCreateResponse resourceCreateResponse = resourceServiceBlockingStub.createResource(resourceCreateRequest);
+ resource = resourceCreateResponse.getResource();
+ } catch (Exception ex) {
+ LOGGER.error("Error occurred while creating resource {} in DRMS", entity.getResourceId(), ex);
+ entity.setEventStatus(EventStatus.ERRORED.name());
+ entity.setError("Error occurred while creating resource in DRMS " + ex.getMessage());
+ repository.save(entity);
+ return;
+ }
+
+ try {
+ WorkflowServiceAuthToken workflowServiceAuthToken = WorkflowServiceAuthToken
+ .newBuilder()
+ .setAccessToken("")
+ .build();
+ WorkflowMessage workflowMessage = WorkflowMessage.newBuilder()
+ .setResourceId(resource.getResourceId())
+ .build();
+
+ WorkflowInvocationRequest workflowInvocationRequest = WorkflowInvocationRequest
+ .newBuilder().setMessage(workflowMessage).setAuthToken(workflowServiceAuthToken).build();
+ this.workflowServiceStub.invokeWorkflow(workflowInvocationRequest);
+ } catch (Exception ex) {
+ LOGGER.error("Error occurred while invoking workflow engine", entity.getResourceId(), ex);
+ entity.setEventStatus(EventStatus.ERRORED.name());
+ entity.setError("Error occurred while invoking workflow engine" + ex.getMessage());
+ repository.save(entity);
+ return;
+ }
+ } else {
+ LOGGER.error("Incorrect storage preference {}", entity.getStoragePreferenceId());
+ entity.setEventStatus(EventStatus.ERRORED.name());
+ entity.setError("Incorrect storage preference " + entity.getStoragePreferenceId());
+ repository.save(entity);
+ return;
}
+ entity.setEventStatus(EventStatus.DISPATCHED_TO_WORFLOW_ENGING.name());
+ repository.save(entity);
} catch (Exception exception) {
LOGGER.error("Error occurred while processing outbound data orcehstrator event", exception);
- throw new RuntimeException(exception);
+ entity.setEventStatus(EventStatus.ERRORED.name());
+ entity.setError("Error occurred while processing ");
+ repository.save(entity);
}
}
-
}
diff --git a/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto b/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto
index 64cecad..a3557b1 100644
--- a/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto
+++ b/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto
@@ -30,7 +30,7 @@ message WorkflowMessage {
}
message WorkflowInvocationRequest {
- org.apache.airavata.datalake.orchestrator.workflow.WorkflowServiceAuthToken authToken = 1;
+ org.apache.airavata.datalake.orchestrator.workflow.WorkflowServiceAuthToken auth_token = 1;
WorkflowMessage message = 2;
}
diff --git a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java
index d3ff585..ad345de 100644
--- a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java
+++ b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java
@@ -126,7 +126,7 @@ public class ResourceServiceHandler extends ResourceServiceGrpc.ResourceServiceI
if (request.getResource().getStoragePreferenceCase()
.equals(GenericResource.StoragePreferenceCase.S3_PREFERENCE)) {
storagePreferenceId = request.getResource().getS3Preference().getStoragePreferenceId();
- } else if (request.getResource().getStoragePreferenceCase().name()
+ } else if (request.getResource().getStoragePreferenceCase()
.equals(GenericResource.StoragePreferenceCase.SSH_PREFERENCE)) {
storagePreferenceId = request.getResource().getSshPreference().getStoragePreferenceId();
}
diff --git a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/utils/CustosUtils.java b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/utils/CustosUtils.java
index 78256df..fee7583 100644
--- a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/utils/CustosUtils.java
+++ b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/utils/CustosUtils.java
@@ -102,12 +102,7 @@ public class CustosUtils {
Status status = sharingManagementClient.isEntityExists(tenantId, entity);
if (!status.getStatus()) {
sharingManagementClient.createEntity(tenantId, entity);
- return Optional.ofNullable(sharingManagementClient.getEntity(tenantId, entity));
}
-
- return Optional.empty();
-
+ return Optional.ofNullable(sharingManagementClient.getEntity(tenantId, entity));
}
-
-
}