You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2021/08/23 15:05:50 UTC

[airavata-data-lake] branch master updated: Avoiding the same event concurrently running

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git


The following commit(s) were added to refs/heads/master by this push:
     new c407744  Avoiding the same event concurrently running
c407744 is described below

commit c40774424d066d1c5d5639c0f8924fbe81113401
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Mon Aug 23 11:05:40 2021 -0400

    Avoiding the same event concurrently running
---
 .../handlers/async/OrchestratorEventHandler.java        | 17 ++++++++++++++---
 .../handlers/async/OrchestratorEventProcessor.java      |  7 ++++++-
 2 files changed, 20 insertions(+), 4 deletions(-)

diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java
index d6d25bc..d706c16 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java
@@ -21,9 +21,10 @@ import org.apache.airavata.datalake.orchestrator.Configuration;
 import org.apache.airavata.dataorchestrator.messaging.consumer.MessageConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -40,6 +41,7 @@ public class OrchestratorEventHandler {
     private ExecutorService executorService;
     private ScheduledExecutorService ouboundExecutorService;
     private MessageConsumer messageConsumer;
+    private final Set<String> eventCache = new HashSet<>();
 
     public OrchestratorEventHandler() {
     }
@@ -56,9 +58,18 @@ public class OrchestratorEventHandler {
 
     public void startProcessing() throws Exception {
         messageConsumer.consume((notificationEvent -> {
-            LOGGER.info("Message received for resource path {}", notificationEvent.getResourcePath());
+            LOGGER.info("Message received for resource path {} type {}", notificationEvent.getResourcePath(),
+                    notificationEvent.getEventType());
             try {
-                this.executorService.submit(new OrchestratorEventProcessor(configuration, notificationEvent));
+                if (!eventCache.contains(notificationEvent.getResourcePath() + ":" + notificationEvent.getHostName())) {
+                    eventCache.add(notificationEvent.getResourcePath() + ":" + notificationEvent.getHostName());
+                    this.executorService.submit(new OrchestratorEventProcessor(
+                            configuration, notificationEvent, eventCache));
+                } else {
+                    LOGGER.info("Event is already processing");
+                }
+
+
             } catch (Exception e) {
                 LOGGER.error("Failed tu submit data orchestrator event to process on path {}",
                         notificationEvent.getResourcePath(), e);
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
index 6fc7d45..af8bac0 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
@@ -46,9 +46,12 @@ public class OrchestratorEventProcessor implements Runnable {
     private DRMSConnector drmsConnector;
     private Configuration configuration;
     private WorkflowServiceConnector workflowServiceConnector;
+    private final Set<String> eventCache;
 
-    public OrchestratorEventProcessor(Configuration configuration, NotificationEvent notificationEvent) throws Exception {
+    public OrchestratorEventProcessor(Configuration configuration, NotificationEvent notificationEvent,
+                                      Set<String> eventCache) throws Exception {
         this.notificationEvent = notificationEvent;
+        this.eventCache = eventCache;
         this.drmsConnector = new DRMSConnector(configuration);
         this.workflowServiceConnector = new WorkflowServiceConnector(configuration);
         this.configuration = configuration;
@@ -255,6 +258,8 @@ public class OrchestratorEventProcessor implements Runnable {
 
         } catch (Exception e) {
             logger.error("Failed to process event for resource path {}", notificationEvent.getResourcePath(), e);
+        } finally {
+            this.eventCache.remove(notificationEvent.getResourcePath() + ":" + notificationEvent.getHostName());
         }
     }
 }