You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2021/08/23 15:05:50 UTC
[airavata-data-lake] branch master updated: Avoiding the same event
concurrently running
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git
The following commit(s) were added to refs/heads/master by this push:
new c407744 Avoiding the same event concurrently running
c407744 is described below
commit c40774424d066d1c5d5639c0f8924fbe81113401
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Mon Aug 23 11:05:40 2021 -0400
Avoiding the same event concurrently running
---
.../handlers/async/OrchestratorEventHandler.java | 17 ++++++++++++++---
.../handlers/async/OrchestratorEventProcessor.java | 7 ++++++-
2 files changed, 20 insertions(+), 4 deletions(-)
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java
index d6d25bc..d706c16 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java
@@ -21,9 +21,10 @@ import org.apache.airavata.datalake.orchestrator.Configuration;
import org.apache.airavata.dataorchestrator.messaging.consumer.MessageConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -40,6 +41,7 @@ public class OrchestratorEventHandler {
private ExecutorService executorService;
private ScheduledExecutorService ouboundExecutorService;
private MessageConsumer messageConsumer;
+ private final Set<String> eventCache = new HashSet<>();
public OrchestratorEventHandler() {
}
@@ -56,9 +58,18 @@ public class OrchestratorEventHandler {
public void startProcessing() throws Exception {
messageConsumer.consume((notificationEvent -> {
- LOGGER.info("Message received for resource path {}", notificationEvent.getResourcePath());
+ LOGGER.info("Message received for resource path {} type {}", notificationEvent.getResourcePath(),
+ notificationEvent.getEventType());
try {
- this.executorService.submit(new OrchestratorEventProcessor(configuration, notificationEvent));
+ if (!eventCache.contains(notificationEvent.getResourcePath() + ":" + notificationEvent.getHostName())) {
+ eventCache.add(notificationEvent.getResourcePath() + ":" + notificationEvent.getHostName());
+ this.executorService.submit(new OrchestratorEventProcessor(
+ configuration, notificationEvent, eventCache));
+ } else {
+ LOGGER.info("Event is already processing");
+ }
+
+
} catch (Exception e) {
LOGGER.error("Failed tu submit data orchestrator event to process on path {}",
notificationEvent.getResourcePath(), e);
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
index 6fc7d45..af8bac0 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
@@ -46,9 +46,12 @@ public class OrchestratorEventProcessor implements Runnable {
private DRMSConnector drmsConnector;
private Configuration configuration;
private WorkflowServiceConnector workflowServiceConnector;
+ private final Set<String> eventCache;
- public OrchestratorEventProcessor(Configuration configuration, NotificationEvent notificationEvent) throws Exception {
+ public OrchestratorEventProcessor(Configuration configuration, NotificationEvent notificationEvent,
+ Set<String> eventCache) throws Exception {
this.notificationEvent = notificationEvent;
+ this.eventCache = eventCache;
this.drmsConnector = new DRMSConnector(configuration);
this.workflowServiceConnector = new WorkflowServiceConnector(configuration);
this.configuration = configuration;
@@ -255,6 +258,8 @@ public class OrchestratorEventProcessor implements Runnable {
} catch (Exception e) {
logger.error("Failed to process event for resource path {}", notificationEvent.getResourcePath(), e);
+ } finally {
+ this.eventCache.remove(notificationEvent.getResourcePath() + ":" + notificationEvent.getHostName());
}
}
}