You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2021/09/05 21:30:06 UTC

[airavata-data-lake] branch master updated: Persisting notification and events in the database. Implementing the grpc service

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git


The following commit(s) were added to refs/heads/master by this push:
     new 501b613  Persisting notification and events in the database. Implementing the grpc service
501b613 is described below

commit 501b613f1366a7b60400f25fba5c79104c161c95
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Sun Sep 5 17:29:57 2021 -0400

    Persisting notification and events in the database. Implementing the grpc service
---
 .../templates/data-orchestrator/config.yml.j2      |   2 +
 ansible/roles/workflow-engine/tasks/main.yml       |   7 ++
 .../data-orchestrator-clients-core/pom.xml         |   5 +
 .../clients/core/NotificationClient.java           |  45 ++++++++
 .../entity/notification/NotificationEntity.java    | 128 +++++++++++++++++++++
 .../notification/NotificationStatusEntity.java     |  93 +++++++++++++++
 .../NotificationEntityRepository.java}             |  12 +-
 .../NotificationStatusEntityRepository.java}       |  12 +-
 .../data-orchestrator-api-server/pom.xml           |   5 +
 .../datalake/orchestrator/Configuration.java       |  23 +++-
 .../handlers/async/OrchestratorEventHandler.java   |  25 +++-
 .../handlers/async/OrchestratorEventProcessor.java | 118 ++++++++++++-------
 .../handlers/grpc/NotificationApiHandler.java      |  88 ++++++++++++++
 .../src/main/proto/notification.proto              |  96 ++++++++++++++++
 14 files changed, 591 insertions(+), 68 deletions(-)

diff --git a/ansible/roles/data_lake/templates/data-orchestrator/config.yml.j2 b/ansible/roles/data_lake/templates/data-orchestrator/config.yml.j2
index f64e465..7816641 100644
--- a/ansible/roles/data_lake/templates/data-orchestrator/config.yml.j2
+++ b/ansible/roles/data_lake/templates/data-orchestrator/config.yml.j2
@@ -12,6 +12,8 @@ outboundEventProcessor:
   drmsPort: {{ datalake_drms_grpc_port }}
   mftHost: "{{ mft_api_service_host }}"
   mftPort: {{ mft_api_service_grpc_port }}
+  notificationServiceHost: "localhost"
+  notificationServicePort: {{ datalake_data_orch_grpc_port }}
 consumer:
   brokerURL: "{{ datalake_data_orch_broker_url }}"
   consumerGroup: "{{ datalake_data_orch_broker_consumer_group }}"
diff --git a/ansible/roles/workflow-engine/tasks/main.yml b/ansible/roles/workflow-engine/tasks/main.yml
index 856fdc6..f5f3071 100644
--- a/ansible/roles/workflow-engine/tasks/main.yml
+++ b/ansible/roles/workflow-engine/tasks/main.yml
@@ -15,6 +15,13 @@
   become: yes
   become_user: "{{ user }}"
 
+- name: open firewall ports for Workflow Engine
+  firewalld: port="{{ item }}/tcp"
+    zone=public permanent=true state=enabled immediate=yes
+  with_items:
+    - "{{ workflow_manager_grpc_port }}"
+  become: yes
+
 - name: Run Datalake maven build
   command: mvn clean install -Dmaven.test.skip=true chdir="{{ datalake_source_dir }}/"
   environment:
diff --git a/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/pom.xml b/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/pom.xml
index 6ec8b80..e3cb9f9 100644
--- a/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/pom.xml
+++ b/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/pom.xml
@@ -22,6 +22,11 @@
             <artifactId>data-orchestrator-messaging</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.airavata.data.lake</groupId>
+            <artifactId>data-orchestrator-api-stub</artifactId>
+            <version>${project.version}</version>
+        </dependency>
 
     </dependencies>
 
diff --git a/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/src/main/java/org/apache/airavata/dataorchestrator/clients/core/NotificationClient.java b/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/src/main/java/org/apache/airavata/dataorchestrator/clients/core/NotificationClient.java
new file mode 100644
index 0000000..9c3d076
--- /dev/null
+++ b/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/src/main/java/org/apache/airavata/dataorchestrator/clients/core/NotificationClient.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.dataorchestrator.clients.core;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import org.apache.airavata.datalake.data.orchestrator.api.stub.notification.NotificationServiceGrpc;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public class NotificationClient implements Closeable {
+
+    private final ManagedChannel channel;
+
+    public NotificationClient(String hostName, int port) {
+        channel = ManagedChannelBuilder.forAddress(hostName, port).usePlaintext().build();
+    }
+
+    public NotificationServiceGrpc.NotificationServiceBlockingStub get() {
+        return NotificationServiceGrpc.newBlockingStub(channel);
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (channel != null) {
+            channel.shutdown();
+        }
+    }
+}
diff --git a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/notification/NotificationEntity.java b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/notification/NotificationEntity.java
new file mode 100644
index 0000000..3737aa7
--- /dev/null
+++ b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/notification/NotificationEntity.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.registry.persistance.entity.notification;
+
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.Table;
+
+@Entity
+@Table(name = "NOTIFICATION_ENTITY")
+public class NotificationEntity {
+
+    @Id
+    @Column(name = "NOTIFICATION_ID")
+    private String notificationId;
+
+    @Column(name = "RESOURCE_PATH")
+    private String resourcePath;
+
+    @Column(name = "RESOURCE_TYPE")
+    private String resourceType;
+
+    @Column(name = "OCCURED_TIME")
+    private long occuredTime;
+
+    @Column(name = "AUTH_TOKEN")
+    private String authToken;
+
+    @Column(name = "TENANT_ID")
+    private String tenantId;
+
+    @Column(name = "HOSTNAME")
+    private String hostName;
+
+    @Column(name = "BASE_PATH")
+    private String basePath;
+
+    @Column(name = "EVENT_TYPE")
+    private String eventType;
+
+    public String getNotificationId() {
+        return notificationId;
+    }
+
+    public void setNotificationId(String notificationId) {
+        this.notificationId = notificationId;
+    }
+
+    public String getResourcePath() {
+        return resourcePath;
+    }
+
+    public void setResourcePath(String resourcePath) {
+        this.resourcePath = resourcePath;
+    }
+
+    public String getResourceType() {
+        return resourceType;
+    }
+
+    public void setResourceType(String resourceType) {
+        this.resourceType = resourceType;
+    }
+
+    public long getOccuredTime() {
+        return occuredTime;
+    }
+
+    public void setOccuredTime(long occuredTime) {
+        this.occuredTime = occuredTime;
+    }
+
+    public String getAuthToken() {
+        return authToken;
+    }
+
+    public void setAuthToken(String authToken) {
+        this.authToken = authToken;
+    }
+
+    public String getTenantId() {
+        return tenantId;
+    }
+
+    public void setTenantId(String tenantId) {
+        this.tenantId = tenantId;
+    }
+
+    public String getHostName() {
+        return hostName;
+    }
+
+    public void setHostName(String hostName) {
+        this.hostName = hostName;
+    }
+
+    public String getBasePath() {
+        return basePath;
+    }
+
+    public void setBasePath(String basePath) {
+        this.basePath = basePath;
+    }
+
+    public String getEventType() {
+        return eventType;
+    }
+
+    public void setEventType(String eventType) {
+        this.eventType = eventType;
+    }
+}
diff --git a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/notification/NotificationStatusEntity.java b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/notification/NotificationStatusEntity.java
new file mode 100644
index 0000000..9a4fb89
--- /dev/null
+++ b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/notification/NotificationStatusEntity.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.registry.persistance.entity.notification;
+
+import javax.persistence.*;
+
+@Entity
+@Table(name = "NOTIFICATION_STATUS_ENTITY")
+public class NotificationStatusEntity {
+
+    @Id
+    @Column(name = "STATUS_ID")
+    private String statusId;
+
+    @Column(name = "STATUS")
+    private String status;
+
+    @Column(name = "PUBLISHED_TIME")
+    private long publishedTime;
+
+    @Column(name = "DESCRIPTION", columnDefinition = "TEXT")
+    private String description;
+
+    @Column(name = "NOTIFICATION_ID")
+    private String notificationId;
+
+    @ManyToOne(fetch = FetchType.LAZY)
+    @JoinColumn(name = "NOTIFICATION_ID", insertable=false, updatable=false)
+    private NotificationEntity notificationEntity;
+
+    public String getStatusId() {
+        return statusId;
+    }
+
+    public void setStatusId(String statusId) {
+        this.statusId = statusId;
+    }
+
+    public String getStatus() {
+        return status;
+    }
+
+    public void setStatus(String status) {
+        this.status = status;
+    }
+
+    public long getPublishedTime() {
+        return publishedTime;
+    }
+
+    public void setPublishedTime(long publishedTime) {
+        this.publishedTime = publishedTime;
+    }
+
+    public NotificationEntity getNotificationEntity() {
+        return notificationEntity;
+    }
+
+    public void setNotificationEntity(NotificationEntity notificationEntity) {
+        this.notificationEntity = notificationEntity;
+    }
+
+    public String getNotificationId() {
+        return notificationId;
+    }
+
+    public void setNotificationId(String notificationId) {
+        this.notificationId = notificationId;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public void setDescription(String description) {
+        this.description = description;
+    }
+}
diff --git a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/EventStatus.java b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/repository/NotificationEntityRepository.java
similarity index 75%
copy from data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/EventStatus.java
copy to data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/repository/NotificationEntityRepository.java
index 0df50f3..a14d5a0 100644
--- a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/EventStatus.java
+++ b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/repository/NotificationEntityRepository.java
@@ -15,14 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.airavata.datalake.orchestrator.registry.persistance.entity;
+package org.apache.airavata.datalake.orchestrator.registry.persistance.repository;
 
-public enum EventStatus {
-    DATA_ORCH_RECEIVED,
-    DISPATCHED_TO_WORFLOW_ENGING,
-    DATA_ORCH_PROCESSED_AND_SKIPPED,
-    MFT_CALLBACK_RECEIVED,
-    COMPLETED,
-    ERRORED,
+import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.notification.NotificationEntity;
+import org.springframework.data.jpa.repository.JpaRepository;
 
+public interface NotificationEntityRepository extends JpaRepository<NotificationEntity, String> {
 }
diff --git a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/EventStatus.java b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/repository/NotificationStatusEntityRepository.java
similarity index 73%
rename from data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/EventStatus.java
rename to data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/repository/NotificationStatusEntityRepository.java
index 0df50f3..565cbb2 100644
--- a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/EventStatus.java
+++ b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/repository/NotificationStatusEntityRepository.java
@@ -15,14 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.airavata.datalake.orchestrator.registry.persistance.entity;
+package org.apache.airavata.datalake.orchestrator.registry.persistance.repository;
 
-public enum EventStatus {
-    DATA_ORCH_RECEIVED,
-    DISPATCHED_TO_WORFLOW_ENGING,
-    DATA_ORCH_PROCESSED_AND_SKIPPED,
-    MFT_CALLBACK_RECEIVED,
-    COMPLETED,
-    ERRORED,
+import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.notification.NotificationStatusEntity;
+import org.springframework.data.jpa.repository.JpaRepository;
 
+public interface NotificationStatusEntityRepository extends JpaRepository<NotificationStatusEntity, String> {
 }
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/pom.xml b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/pom.xml
index c7e5291..9b7d5be 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/pom.xml
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/pom.xml
@@ -82,6 +82,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.airavata.data.lake</groupId>
+            <artifactId>data-orchestrator-clients-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata.data.lake</groupId>
             <artifactId>data-orchestrator-registry</artifactId>
             <version>${project.version}</version>
         </dependency>
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Configuration.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Configuration.java
index 8e6ca47..0f33438 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Configuration.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Configuration.java
@@ -165,11 +165,10 @@ public class Configuration {
         private int pollingInterval;
         private String mftHost;
         private int mftPort;
+        private String notificationServiceHost;
+        private int notificationServicePort;
 
-
-        public OutboundEventProcessorConfig() {
-        }
-
+        public OutboundEventProcessorConfig() {}
 
         public String getWorkflowEngineHost() {
             return workflowEngineHost;
@@ -226,6 +225,22 @@ public class Configuration {
         public void setMftPort(int mftPort) {
             this.mftPort = mftPort;
         }
+
+        public String getNotificationServiceHost() {
+            return notificationServiceHost;
+        }
+
+        public void setNotificationServiceHost(String notificationServiceHost) {
+            this.notificationServiceHost = notificationServiceHost;
+        }
+
+        public int getNotificationServicePort() {
+            return notificationServicePort;
+        }
+
+        public void setNotificationServicePort(int notificationServicePort) {
+            this.notificationServicePort = notificationServicePort;
+        }
     }
 
 
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java
index d706c16..9b056a5 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java
@@ -17,17 +17,21 @@
 
 package org.apache.airavata.datalake.orchestrator.handlers.async;
 
+import org.apache.airavata.datalake.data.orchestrator.api.stub.notification.Notification;
+import org.apache.airavata.datalake.data.orchestrator.api.stub.notification.NotificationRegisterRequest;
 import org.apache.airavata.datalake.orchestrator.Configuration;
+import org.apache.airavata.dataorchestrator.clients.core.NotificationClient;
 import org.apache.airavata.dataorchestrator.messaging.consumer.MessageConsumer;
+import org.dozer.DozerBeanMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
 import java.util.HashSet;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 
 /**
  * Orchestrator event handler
@@ -39,9 +43,9 @@ public class OrchestratorEventHandler {
     private Configuration configuration;
 
     private ExecutorService executorService;
-    private ScheduledExecutorService ouboundExecutorService;
     private MessageConsumer messageConsumer;
     private final Set<String> eventCache = new HashSet<>();
+    private NotificationClient notificationClient;
 
     public OrchestratorEventHandler() {
     }
@@ -49,11 +53,13 @@ public class OrchestratorEventHandler {
     public void init(Configuration configuration) throws Exception {
         this.configuration = configuration;
         this.executorService = Executors.newFixedThreadPool(configuration.getEventProcessorWorkers());
-        this.ouboundExecutorService = Executors.newSingleThreadScheduledExecutor();
         messageConsumer = new MessageConsumer(configuration.getConsumer().getBrokerURL(),
                 configuration.getConsumer().getConsumerGroup(),
                 configuration.getConsumer().getMaxPollRecordsConfig(),
                 configuration.getConsumer().getTopic());
+        this.notificationClient = new NotificationClient(
+                configuration.getOutboundEventProcessor().getNotificationServiceHost(),
+                configuration.getOutboundEventProcessor().getNotificationServicePort());
     }
 
     public void startProcessing() throws Exception {
@@ -63,8 +69,19 @@ public class OrchestratorEventHandler {
             try {
                 if (!eventCache.contains(notificationEvent.getResourcePath() + ":" + notificationEvent.getHostName())) {
                     eventCache.add(notificationEvent.getResourcePath() + ":" + notificationEvent.getHostName());
+
+                    Notification.Builder notificationBuilder = Notification.newBuilder();
+                    DozerBeanMapper mapper = new DozerBeanMapper();
+                    mapper.map(notificationEvent, notificationBuilder);
+                    notificationBuilder.setNotificationId(UUID.randomUUID().toString());
+
+                    Notification notification = notificationBuilder.build();
+                    LOGGER.info("Registering notification in the database");
+                    this.notificationClient.get().registerNotification(NotificationRegisterRequest
+                            .newBuilder().setNotification(notification).build());
+
                     this.executorService.submit(new OrchestratorEventProcessor(
-                            configuration, notificationEvent, eventCache));
+                            configuration, notification, eventCache, notificationClient));
                 } else {
                     LOGGER.info("Event is already processing");
                 }
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
index 53f02b6..8ed7c75 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
@@ -17,6 +17,9 @@
 
 package org.apache.airavata.datalake.orchestrator.handlers.async;
 
+import org.apache.airavata.datalake.data.orchestrator.api.stub.notification.Notification;
+import org.apache.airavata.datalake.data.orchestrator.api.stub.notification.NotificationStatus;
+import org.apache.airavata.datalake.data.orchestrator.api.stub.notification.NotificationStatusRegisterRequest;
 import org.apache.airavata.datalake.drms.resource.GenericResource;
 import org.apache.airavata.datalake.drms.storage.AnyStoragePreference;
 import org.apache.airavata.datalake.drms.storage.TransferMapping;
@@ -24,7 +27,7 @@ import org.apache.airavata.datalake.orchestrator.Configuration;
 import org.apache.airavata.datalake.orchestrator.Utils;
 import org.apache.airavata.datalake.orchestrator.connectors.DRMSConnector;
 import org.apache.airavata.datalake.orchestrator.connectors.WorkflowServiceConnector;
-import org.apache.airavata.dataorchestrator.messaging.model.NotificationEvent;
+import org.apache.airavata.dataorchestrator.clients.core.NotificationClient;
 import org.apache.airavata.mft.api.client.MFTApiClient;
 import org.apache.airavata.mft.api.service.DirectoryMetadataResponse;
 import org.apache.airavata.mft.api.service.FetchResourceMetadataRequest;
@@ -41,20 +44,22 @@ public class OrchestratorEventProcessor implements Runnable {
 
     private static final Logger logger = LoggerFactory.getLogger(OrchestratorEventProcessor.class);
 
-    private NotificationEvent notificationEvent;
+    private final Notification notification;
 
-    private DRMSConnector drmsConnector;
-    private Configuration configuration;
-    private WorkflowServiceConnector workflowServiceConnector;
+    private final DRMSConnector drmsConnector;
+    private final Configuration configuration;
+    private final WorkflowServiceConnector workflowServiceConnector;
     private final Set<String> eventCache;
+    private final NotificationClient notificationClient;
 
-    public OrchestratorEventProcessor(Configuration configuration, NotificationEvent notificationEvent,
-                                      Set<String> eventCache) throws Exception {
-        this.notificationEvent = notificationEvent;
+    public OrchestratorEventProcessor(Configuration configuration, Notification notificationEvent,
+                                      Set<String> eventCache, NotificationClient notificationClient) throws Exception {
+        this.notification = notificationEvent;
         this.eventCache = eventCache;
         this.drmsConnector = new DRMSConnector(configuration);
         this.workflowServiceConnector = new WorkflowServiceConnector(configuration);
         this.configuration = configuration;
+        this.notificationClient = notificationClient;
     }
 
     private List<GenericResource> createResourceRecursively(String storageId, String basePath,
@@ -74,8 +79,8 @@ public class OrchestratorEventProcessor implements Runnable {
             currentPath = currentPath + "/" + resourceName;
             String resourceId = Utils.getId(storageId + ":" + currentPath);
             Optional<GenericResource> optionalGenericResource =
-                    this.drmsConnector.createResource(notificationEvent.getAuthToken(),
-                            notificationEvent.getTenantId(),
+                    this.drmsConnector.createResource(notification.getAuthToken(),
+                            notification.getTenantId(),
                             resourceId, resourceName, currentPath, parentId, "COLLECTION", parentType, user);
             if (optionalGenericResource.isPresent()) {
                 parentId = optionalGenericResource.get().getResourceId();
@@ -90,8 +95,8 @@ public class OrchestratorEventProcessor implements Runnable {
         currentPath = currentPath + "/" + splitted[splitted.length - 1];
 
         Optional<GenericResource> optionalGenericResource =
-                this.drmsConnector.createResource(notificationEvent.getAuthToken(),
-                        notificationEvent.getTenantId(),
+                this.drmsConnector.createResource(notification.getAuthToken(),
+                        notification.getTenantId(),
                         Utils.getId(storageId + ":" + currentPath),
                         splitted[splitted.length - 1], currentPath,
                         parentId, resourceType, parentType, user);
@@ -111,7 +116,7 @@ public class OrchestratorEventProcessor implements Runnable {
         for (GenericResource resource : resourceList) {
             logger.info("Sharing resource {} with path {} with user {}",
                     resource.getResourceId(), resource.getResourcePath(), user);
-            this.drmsConnector.shareWithUser(notificationEvent.getAuthToken(), notificationEvent.getTenantId(),
+            this.drmsConnector.shareWithUser(notification.getAuthToken(), notification.getTenantId(),
                     admin, user, resource.getResourceId(), permission);
         }
     }
@@ -120,25 +125,34 @@ public class OrchestratorEventProcessor implements Runnable {
         for (GenericResource resource : resourceList) {
             logger.info("Sharing resource {} with path {} with group {}",
                     resource.getResourceId(), resource.getResourcePath(), group);
-            this.drmsConnector.shareWithGroup(notificationEvent.getAuthToken(), notificationEvent.getTenantId(),
+            this.drmsConnector.shareWithGroup(notification.getAuthToken(), notification.getTenantId(),
                     admin, group, resource.getResourceId(), permission);
         }
     }
 
     @Override
     public void run() {
-        logger.info("Processing resource path {} on storage {}", notificationEvent.getResourcePath(),
-                notificationEvent.getBasePath());
+        logger.info("Processing resource path {} on storage {}", notification.getResourcePath(),
+                notification.getBasePath());
 
         try {
 
-            if (!"FOLDER".equals(notificationEvent.getResourceType())) {
+            this.notificationClient.get().registerNotificationStatus(NotificationStatusRegisterRequest.newBuilder()
+                    .setStatus(NotificationStatus.newBuilder()
+                            .setStatusId(UUID.randomUUID().toString())
+                            .setNotificationId(notification.getNotificationId())
+                            .setStatus(NotificationStatus.StatusType.DATA_ORCH_RECEIVED)
+                            .setDescription("Notification Received")
+                            .setPublishedTime(System.currentTimeMillis())
+                            .build()).build());
+
+            if (!"FOLDER".equals(notification.getResourceType())) {
                 logger.error("Resource {} should be a Folder type but got {}",
-                        notificationEvent.getResourcePath(),
-                        notificationEvent.getResourceType());
+                        notification.getResourcePath(),
+                        notification.getResourceType());
                 logger.error("Resource should be a Folder type");
             }
-            String removeBasePath = notificationEvent.getResourcePath().substring(notificationEvent.getBasePath().length());
+            String removeBasePath = notification.getResourcePath().substring(notification.getBasePath().length());
             String[] splitted = removeBasePath.split("/");
 
             String adminUser = splitted[0];
@@ -149,12 +163,12 @@ public class OrchestratorEventProcessor implements Runnable {
             ownerRules.put(splitted[1], "OWNER");
 
             Optional<TransferMapping> optionalTransferMapping = drmsConnector.getActiveTransferMapping(
-                    notificationEvent.getAuthToken(),
-                    notificationEvent.getTenantId(), adminUser,
-                    notificationEvent.getHostName());
+                    notification.getAuthToken(),
+                    notification.getTenantId(), adminUser,
+                    notification.getHostName());
 
             if (optionalTransferMapping.isEmpty()) {
-                logger.error("Could not find a transfer mapping for user {} and host {}", adminUser, notificationEvent.getHostName());
+                logger.error("Could not find a transfer mapping for user {} and host {}", adminUser, notification.getHostName());
                 throw new Exception("Could not find a transfer mapping");
             }
 
@@ -166,8 +180,8 @@ public class OrchestratorEventProcessor implements Runnable {
             // Creating parent resource
 
             List<GenericResource> resourceList = createResourceRecursively(sourceStorageId,
-                    notificationEvent.getBasePath(),
-                    notificationEvent.getResourcePath(),
+                    notification.getBasePath(),
+                    notification.getResourcePath(),
                     "COLLECTION", adminUser);
 
             shareResourcesWithUsers(Collections.singletonList(resourceList.get(resourceList.size() - 1)),
@@ -184,8 +198,8 @@ public class OrchestratorEventProcessor implements Runnable {
             GenericResource resourceObj = resourceList.get(resourceList.size() - 1);
 
             Optional<AnyStoragePreference> sourceSPOp = this.drmsConnector.getStoragePreference(
-                    notificationEvent.getAuthToken(), adminUser,
-                    notificationEvent.getTenantId(), sourceStorageId);
+                    notification.getAuthToken(), adminUser,
+                    notification.getTenantId(), sourceStorageId);
 
             if (sourceSPOp.isEmpty()) {
                 logger.error("No storage preference found for source storage {} and user {}", sourceStorageId, adminUser);
@@ -193,8 +207,8 @@ public class OrchestratorEventProcessor implements Runnable {
             }
 
             Optional<AnyStoragePreference> destSPOp = this.drmsConnector.getStoragePreference(
-                    notificationEvent.getAuthToken(), adminUser,
-                    notificationEvent.getTenantId(), destinationStorageId);
+                    notification.getAuthToken(), adminUser,
+                    notification.getTenantId(), destinationStorageId);
 
             if (destSPOp.isEmpty()) {
                 logger.error("No storage preference found for destination storage {} and user {}", sourceStorageId, adminUser);
@@ -204,7 +218,7 @@ public class OrchestratorEventProcessor implements Runnable {
             AnyStoragePreference sourceSP = sourceSPOp.get();
             AnyStoragePreference destSP = destSPOp.get();
 
-            String decodedAuth = new String(Base64.getDecoder().decode(notificationEvent.getAuthToken()));
+            String decodedAuth = new String(Base64.getDecoder().decode(notification.getAuthToken()));
             String[] authParts = decodedAuth.split(":");
 
             if (authParts.length != 2) {
@@ -215,7 +229,7 @@ public class OrchestratorEventProcessor implements Runnable {
                     .setUserId(adminUser)
                     .setClientId(authParts[0])
                     .setClientSecret(authParts[1])
-                    .putProperties("TENANT_ID", notificationEvent.getTenantId()).build();
+                    .putProperties("TENANT_ID", notification.getTenantId()).build();
 
             AuthToken mftAuth = AuthToken.newBuilder().setDelegateAuth(delegateAuth).build();
 
@@ -248,7 +262,7 @@ public class OrchestratorEventProcessor implements Runnable {
             List<String> resourceIDsToProcess = new ArrayList<>();
             for (FileMetadataResponse fileMetadata : directoryResourceMetadata.getFilesList()) {
                 logger.info("Registering file {} for source storage {}", fileMetadata.getResourcePath(), sourceStorageId);
-                resourceList = createResourceRecursively(sourceStorageId, notificationEvent.getBasePath(),
+                resourceList = createResourceRecursively(sourceStorageId, notification.getBasePath(),
                         fileMetadata.getResourcePath(), "FILE", adminUser);
                 GenericResource fileResource = resourceList.get(resourceList.size() - 1);
 
@@ -257,32 +271,48 @@ public class OrchestratorEventProcessor implements Runnable {
 
             for (DirectoryMetadataResponse directoryMetadata : directoryResourceMetadata.getDirectoriesList()) {
                 logger.info("Registering directory {} for source storage {}", directoryMetadata.getResourcePath(), sourceStorageId);
-                createResourceRecursively(sourceStorageId, notificationEvent.getBasePath(),
+                createResourceRecursively(sourceStorageId, notification.getBasePath(),
                         directoryMetadata.getResourcePath(),
                         "COLLECTION", adminUser);
                 // TODO scan directories
             }
 
-            logger.info("Creating destination zip resource for directory {}", notificationEvent.getResourcePath());
-            resourceList = createResourceRecursively(destinationStorageId, notificationEvent.getBasePath(),
-                    notificationEvent.getResourcePath(), "FILE", adminUser);
+            logger.info("Creating destination zip resource for directory {}", notification.getResourcePath());
+            resourceList = createResourceRecursively(destinationStorageId, notification.getBasePath(),
+                    notification.getResourcePath(), "FILE", adminUser);
 
             GenericResource destinationResource = resourceList.get(resourceList.size() - 1);
 
-            System.out.println(destinationResource);
-
             logger.info("Submitting resources to workflow manager");
-            this.workflowServiceConnector.invokeWorkflow(notificationEvent.getAuthToken(), adminUser,
-                    notificationEvent.getTenantId(), resourceIDsToProcess, sourceSP.getSshStoragePreference().getStoragePreferenceId(),
+            this.workflowServiceConnector.invokeWorkflow(notification.getAuthToken(), adminUser,
+                    notification.getTenantId(), resourceIDsToProcess, sourceSP.getSshStoragePreference().getStoragePreferenceId(),
                     destinationResource.getResourceId(), destSP.getSshStoragePreference().getStoragePreferenceId());
 
 
-            logger.info("Completed processing path {}", notificationEvent.getResourcePath());
+            this.notificationClient.get().registerNotificationStatus(NotificationStatusRegisterRequest.newBuilder()
+                    .setStatus(NotificationStatus.newBuilder()
+                            .setStatusId(UUID.randomUUID().toString())
+                            .setNotificationId(notification.getNotificationId())
+                            .setStatus(NotificationStatus.StatusType.DISPATCHED_TO_WORFLOW_ENGING)
+                            .setDescription("Notification successfully processed at the orchestrator. " +
+                                    "Sending to workflow manager")
+                            .setPublishedTime(System.currentTimeMillis())
+                            .build()).build());
+
+            logger.info("Completed processing path {}", notification.getResourcePath());
 
         } catch (Exception e) {
-            logger.error("Failed to process event for resource path {}", notificationEvent.getResourcePath(), e);
+            logger.error("Failed to process event for resource path {}", notification.getResourcePath(), e);
+            this.notificationClient.get().registerNotificationStatus(NotificationStatusRegisterRequest.newBuilder()
+                    .setStatus(NotificationStatus.newBuilder()
+                            .setStatusId(UUID.randomUUID().toString())
+                            .setNotificationId(notification.getNotificationId())
+                            .setStatus(NotificationStatus.StatusType.ERRORED)
+                            .setDescription("Notification failed due to : " + e.getMessage())
+                            .setPublishedTime(System.currentTimeMillis())
+                            .build()).build());
         } finally {
-            this.eventCache.remove(notificationEvent.getResourcePath() + ":" + notificationEvent.getHostName());
+            this.eventCache.remove(notification.getResourcePath() + ":" + notification.getHostName());
         }
     }
 }
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/grpc/NotificationApiHandler.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/grpc/NotificationApiHandler.java
new file mode 100644
index 0000000..d44d81d
--- /dev/null
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/grpc/NotificationApiHandler.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.handlers.grpc;
+
+import io.grpc.stub.StreamObserver;
+import org.apache.airavata.datalake.data.orchestrator.api.stub.notification.*;
+import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.notification.NotificationEntity;
+import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.notification.NotificationStatusEntity;
+import org.apache.airavata.datalake.orchestrator.registry.persistance.repository.NotificationEntityRepository;
+import org.apache.airavata.datalake.orchestrator.registry.persistance.repository.NotificationStatusEntityRepository;
+import org.dozer.DozerBeanMapper;
+import org.lognet.springboot.grpc.GRpcService;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Stream;
+
+@GRpcService
+public class NotificationApiHandler extends NotificationServiceGrpc.NotificationServiceImplBase {
+
+    @Autowired
+    private NotificationEntityRepository notificationRepository;
+
+    @Autowired
+    private NotificationStatusEntityRepository notificationStatusRepository;
+
+    @Override
+    public void registerNotification(NotificationRegisterRequest request, StreamObserver<NotificationRegisterResponse> responseObserver) {
+        DozerBeanMapper mapper = new DozerBeanMapper();
+        NotificationEntity notificationEntity = mapper.map(request.getNotification(), NotificationEntity.class);
+        notificationRepository.save(notificationEntity);
+        responseObserver.onNext(NotificationRegisterResponse.newBuilder().build());
+        responseObserver.onCompleted();
+    }
+
+    @Override
+    public void listNotifications(NotificationListRequest request, StreamObserver<NotificationListResponse> responseObserver) {
+        List<NotificationEntity> allEntities = notificationRepository.findAll();
+        DozerBeanMapper mapper = new DozerBeanMapper();
+        NotificationListResponse.Builder responseBuilder = NotificationListResponse.newBuilder();
+        for (NotificationEntity e : allEntities) {
+            Notification.Builder builder = Notification.newBuilder();
+            mapper.map(e, builder);
+            responseBuilder.addNotifications(builder.build());
+        }
+        responseObserver.onNext(responseBuilder.build());
+        responseObserver.onCompleted();
+    }
+
+    @Override
+    public void registerNotificationStatus(NotificationStatusRegisterRequest request, StreamObserver<NotificationStatusRegisterResponse> responseObserver) {
+        DozerBeanMapper mapper = new DozerBeanMapper();
+        NotificationStatusEntity entity = mapper.map(request.getStatus(), NotificationStatusEntity.class);
+        notificationStatusRepository.save(entity);
+        responseObserver.onNext(NotificationStatusRegisterResponse.newBuilder().build());
+        responseObserver.onCompleted();
+    }
+
+    @Override
+    public void listNotificationStatus(NotificationStatusListRequest request, StreamObserver<NotificationStatusListResponse> responseObserver) {
+        List<NotificationStatusEntity> allEntities = notificationStatusRepository.findAll();
+        DozerBeanMapper mapper = new DozerBeanMapper();
+        NotificationStatusListResponse.Builder responseBuilder = NotificationStatusListResponse.newBuilder();
+        for (NotificationStatusEntity e : allEntities) {
+            NotificationStatus.Builder builder = NotificationStatus.newBuilder();
+            mapper.map(e, builder);
+            responseBuilder.addStatuses(builder.build());
+        }
+        responseObserver.onNext(responseBuilder.build());
+        responseObserver.onCompleted();
+    }
+}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/notification.proto b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/notification.proto
new file mode 100644
index 0000000..141bfb4
--- /dev/null
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/notification.proto
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+option java_multiple_files = true;
+package org.apache.airavata.datalake.data.orchestrator.api.stub.notification;
+
+import "google/api/annotations.proto";
+import "google/protobuf/empty.proto";
+
+message Notification {
+    string notificationId = 1;
+    string resourcePath = 2;
+    string resourceType = 3;
+    int64 occuredTime = 4;
+    string authToken = 5;
+    string tenantId = 6;
+    string hostName = 7;
+    string basePath = 8;
+    string eventType = 9;
+}
+
+message NotificationStatus {
+    string statusId = 1;
+    enum StatusType {
+        DATA_ORCH_RECEIVED = 0;
+        DISPATCHED_TO_WORFLOW_ENGING = 1;
+        DATA_ORCH_PROCESSED_AND_SKIPPED = 2;
+        PARSING = 3;
+        COMPLETED = 4;
+        ERRORED = 5;
+    }
+    StatusType status = 2;
+    int64 publishedTime = 3;
+    string notificationId = 4;
+    string description = 5;
+
+}
+
+message NotificationRegisterRequest {
+    Notification notification = 1;
+}
+
+message NotificationRegisterResponse {
+}
+
+message NotificationListRequest {
+}
+
+message NotificationListResponse {
+    repeated Notification notifications = 1;
+}
+
+message NotificationStatusRegisterRequest {
+    NotificationStatus status = 1;
+}
+
+message NotificationStatusRegisterResponse {
+}
+
+message NotificationStatusListRequest {
+}
+
+message NotificationStatusListResponse {
+    repeated NotificationStatus statuses = 1;
+}
+
+service NotificationService {
+
+    rpc registerNotification (NotificationRegisterRequest) returns (NotificationRegisterResponse) {
+    }
+
+    rpc listNotifications (NotificationListRequest) returns (NotificationListResponse) {
+    }
+
+    rpc registerNotificationStatus (NotificationStatusRegisterRequest) returns (NotificationStatusRegisterResponse) {
+    }
+
+    rpc listNotificationStatus(NotificationStatusListRequest) returns (NotificationStatusListResponse) {
+    }
+}
\ No newline at end of file