You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2021/09/05 21:30:06 UTC
[airavata-data-lake] branch master updated: Persisting notification
and events in the database. Implementing the grpc service
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git
The following commit(s) were added to refs/heads/master by this push:
new 501b613 Persisting notification and events in the database. Implementing the grpc service
501b613 is described below
commit 501b613f1366a7b60400f25fba5c79104c161c95
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Sun Sep 5 17:29:57 2021 -0400
Persisting notification and events in the database. Implementing the grpc service
---
.../templates/data-orchestrator/config.yml.j2 | 2 +
ansible/roles/workflow-engine/tasks/main.yml | 7 ++
.../data-orchestrator-clients-core/pom.xml | 5 +
.../clients/core/NotificationClient.java | 45 ++++++++
.../entity/notification/NotificationEntity.java | 128 +++++++++++++++++++++
.../notification/NotificationStatusEntity.java | 93 +++++++++++++++
.../NotificationEntityRepository.java} | 12 +-
.../NotificationStatusEntityRepository.java} | 12 +-
.../data-orchestrator-api-server/pom.xml | 5 +
.../datalake/orchestrator/Configuration.java | 23 +++-
.../handlers/async/OrchestratorEventHandler.java | 25 +++-
.../handlers/async/OrchestratorEventProcessor.java | 118 ++++++++++++-------
.../handlers/grpc/NotificationApiHandler.java | 88 ++++++++++++++
.../src/main/proto/notification.proto | 96 ++++++++++++++++
14 files changed, 591 insertions(+), 68 deletions(-)
diff --git a/ansible/roles/data_lake/templates/data-orchestrator/config.yml.j2 b/ansible/roles/data_lake/templates/data-orchestrator/config.yml.j2
index f64e465..7816641 100644
--- a/ansible/roles/data_lake/templates/data-orchestrator/config.yml.j2
+++ b/ansible/roles/data_lake/templates/data-orchestrator/config.yml.j2
@@ -12,6 +12,8 @@ outboundEventProcessor:
drmsPort: {{ datalake_drms_grpc_port }}
mftHost: "{{ mft_api_service_host }}"
mftPort: {{ mft_api_service_grpc_port }}
+ notificationServiceHost: "localhost"
+ notificationServicePort: {{ datalake_data_orch_grpc_port }}
consumer:
brokerURL: "{{ datalake_data_orch_broker_url }}"
consumerGroup: "{{ datalake_data_orch_broker_consumer_group }}"
diff --git a/ansible/roles/workflow-engine/tasks/main.yml b/ansible/roles/workflow-engine/tasks/main.yml
index 856fdc6..f5f3071 100644
--- a/ansible/roles/workflow-engine/tasks/main.yml
+++ b/ansible/roles/workflow-engine/tasks/main.yml
@@ -15,6 +15,13 @@
become: yes
become_user: "{{ user }}"
+- name: open firewall ports for Workflow Engine
+ firewalld: port="{{ item }}/tcp"
+ zone=public permanent=true state=enabled immediate=yes
+ with_items:
+ - "{{ workflow_manager_grpc_port }}"
+ become: yes
+
- name: Run Datalake maven build
command: mvn clean install -Dmaven.test.skip=true chdir="{{ datalake_source_dir }}/"
environment:
diff --git a/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/pom.xml b/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/pom.xml
index 6ec8b80..e3cb9f9 100644
--- a/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/pom.xml
+++ b/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/pom.xml
@@ -22,6 +22,11 @@
<artifactId>data-orchestrator-messaging</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.airavata.data.lake</groupId>
+ <artifactId>data-orchestrator-api-stub</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
diff --git a/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/src/main/java/org/apache/airavata/dataorchestrator/clients/core/NotificationClient.java b/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/src/main/java/org/apache/airavata/dataorchestrator/clients/core/NotificationClient.java
new file mode 100644
index 0000000..9c3d076
--- /dev/null
+++ b/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/src/main/java/org/apache/airavata/dataorchestrator/clients/core/NotificationClient.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.dataorchestrator.clients.core;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import org.apache.airavata.datalake.data.orchestrator.api.stub.notification.NotificationServiceGrpc;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public class NotificationClient implements Closeable {
+
+ private final ManagedChannel channel;
+
+ public NotificationClient(String hostName, int port) {
+ channel = ManagedChannelBuilder.forAddress(hostName, port).usePlaintext().build();
+ }
+
+ public NotificationServiceGrpc.NotificationServiceBlockingStub get() {
+ return NotificationServiceGrpc.newBlockingStub(channel);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (channel != null) {
+ channel.shutdown();
+ }
+ }
+}
diff --git a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/notification/NotificationEntity.java b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/notification/NotificationEntity.java
new file mode 100644
index 0000000..3737aa7
--- /dev/null
+++ b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/notification/NotificationEntity.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.registry.persistance.entity.notification;
+
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.Table;
+
+@Entity
+@Table(name = "NOTIFICATION_ENTITY")
+public class NotificationEntity {
+
+ @Id
+ @Column(name = "NOTIFICATION_ID")
+ private String notificationId;
+
+ @Column(name = "RESOURCE_PATH")
+ private String resourcePath;
+
+ @Column(name = "RESOURCE_TYPE")
+ private String resourceType;
+
+ @Column(name = "OCCURED_TIME")
+ private long occuredTime;
+
+ @Column(name = "AUTH_TOKEN")
+ private String authToken;
+
+ @Column(name = "TENANT_ID")
+ private String tenantId;
+
+ @Column(name = "HOSTNAME")
+ private String hostName;
+
+ @Column(name = "BASE_PATH")
+ private String basePath;
+
+ @Column(name = "EVENT_TYPE")
+ private String eventType;
+
+ public String getNotificationId() {
+ return notificationId;
+ }
+
+ public void setNotificationId(String notificationId) {
+ this.notificationId = notificationId;
+ }
+
+ public String getResourcePath() {
+ return resourcePath;
+ }
+
+ public void setResourcePath(String resourcePath) {
+ this.resourcePath = resourcePath;
+ }
+
+ public String getResourceType() {
+ return resourceType;
+ }
+
+ public void setResourceType(String resourceType) {
+ this.resourceType = resourceType;
+ }
+
+ public long getOccuredTime() {
+ return occuredTime;
+ }
+
+ public void setOccuredTime(long occuredTime) {
+ this.occuredTime = occuredTime;
+ }
+
+ public String getAuthToken() {
+ return authToken;
+ }
+
+ public void setAuthToken(String authToken) {
+ this.authToken = authToken;
+ }
+
+ public String getTenantId() {
+ return tenantId;
+ }
+
+ public void setTenantId(String tenantId) {
+ this.tenantId = tenantId;
+ }
+
+ public String getHostName() {
+ return hostName;
+ }
+
+ public void setHostName(String hostName) {
+ this.hostName = hostName;
+ }
+
+ public String getBasePath() {
+ return basePath;
+ }
+
+ public void setBasePath(String basePath) {
+ this.basePath = basePath;
+ }
+
+ public String getEventType() {
+ return eventType;
+ }
+
+ public void setEventType(String eventType) {
+ this.eventType = eventType;
+ }
+}
diff --git a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/notification/NotificationStatusEntity.java b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/notification/NotificationStatusEntity.java
new file mode 100644
index 0000000..9a4fb89
--- /dev/null
+++ b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/notification/NotificationStatusEntity.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.registry.persistance.entity.notification;
+
+import javax.persistence.*;
+
+@Entity
+@Table(name = "NOTIFICATION_STATUS_ENTITY")
+public class NotificationStatusEntity {
+
+ @Id
+ @Column(name = "STATUS_ID")
+ private String statusId;
+
+ @Column(name = "STATUS")
+ private String status;
+
+ @Column(name = "PUBLISHED_TIME")
+ private long publishedTime;
+
+ @Column(name = "DESCRIPTION", columnDefinition = "TEXT")
+ private String description;
+
+ @Column(name = "NOTIFICATION_ID")
+ private String notificationId;
+
+ @ManyToOne(fetch = FetchType.LAZY)
+ @JoinColumn(name = "NOTIFICATION_ID", insertable=false, updatable=false)
+ private NotificationEntity notificationEntity;
+
+ public String getStatusId() {
+ return statusId;
+ }
+
+ public void setStatusId(String statusId) {
+ this.statusId = statusId;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ public long getPublishedTime() {
+ return publishedTime;
+ }
+
+ public void setPublishedTime(long publishedTime) {
+ this.publishedTime = publishedTime;
+ }
+
+ public NotificationEntity getNotificationEntity() {
+ return notificationEntity;
+ }
+
+ public void setNotificationEntity(NotificationEntity notificationEntity) {
+ this.notificationEntity = notificationEntity;
+ }
+
+ public String getNotificationId() {
+ return notificationId;
+ }
+
+ public void setNotificationId(String notificationId) {
+ this.notificationId = notificationId;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription(String description) {
+ this.description = description;
+ }
+}
diff --git a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/EventStatus.java b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/repository/NotificationEntityRepository.java
similarity index 75%
copy from data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/EventStatus.java
copy to data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/repository/NotificationEntityRepository.java
index 0df50f3..a14d5a0 100644
--- a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/EventStatus.java
+++ b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/repository/NotificationEntityRepository.java
@@ -15,14 +15,10 @@
* limitations under the License.
*/
-package org.apache.airavata.datalake.orchestrator.registry.persistance.entity;
+package org.apache.airavata.datalake.orchestrator.registry.persistance.repository;
-public enum EventStatus {
- DATA_ORCH_RECEIVED,
- DISPATCHED_TO_WORFLOW_ENGING,
- DATA_ORCH_PROCESSED_AND_SKIPPED,
- MFT_CALLBACK_RECEIVED,
- COMPLETED,
- ERRORED,
+import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.notification.NotificationEntity;
+import org.springframework.data.jpa.repository.JpaRepository;
+public interface NotificationEntityRepository extends JpaRepository<NotificationEntity, String> {
}
diff --git a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/EventStatus.java b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/repository/NotificationStatusEntityRepository.java
similarity index 73%
rename from data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/EventStatus.java
rename to data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/repository/NotificationStatusEntityRepository.java
index 0df50f3..565cbb2 100644
--- a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/EventStatus.java
+++ b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/repository/NotificationStatusEntityRepository.java
@@ -15,14 +15,10 @@
* limitations under the License.
*/
-package org.apache.airavata.datalake.orchestrator.registry.persistance.entity;
+package org.apache.airavata.datalake.orchestrator.registry.persistance.repository;
-public enum EventStatus {
- DATA_ORCH_RECEIVED,
- DISPATCHED_TO_WORFLOW_ENGING,
- DATA_ORCH_PROCESSED_AND_SKIPPED,
- MFT_CALLBACK_RECEIVED,
- COMPLETED,
- ERRORED,
+import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.notification.NotificationStatusEntity;
+import org.springframework.data.jpa.repository.JpaRepository;
+public interface NotificationStatusEntityRepository extends JpaRepository<NotificationStatusEntity, String> {
}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/pom.xml b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/pom.xml
index c7e5291..9b7d5be 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/pom.xml
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/pom.xml
@@ -82,6 +82,11 @@
</dependency>
<dependency>
<groupId>org.apache.airavata.data.lake</groupId>
+ <artifactId>data-orchestrator-clients-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata.data.lake</groupId>
<artifactId>data-orchestrator-registry</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Configuration.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Configuration.java
index 8e6ca47..0f33438 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Configuration.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Configuration.java
@@ -165,11 +165,10 @@ public class Configuration {
private int pollingInterval;
private String mftHost;
private int mftPort;
+ private String notificationServiceHost;
+ private int notificationServicePort;
-
- public OutboundEventProcessorConfig() {
- }
-
+ public OutboundEventProcessorConfig() {}
public String getWorkflowEngineHost() {
return workflowEngineHost;
@@ -226,6 +225,22 @@ public class Configuration {
public void setMftPort(int mftPort) {
this.mftPort = mftPort;
}
+
+ public String getNotificationServiceHost() {
+ return notificationServiceHost;
+ }
+
+ public void setNotificationServiceHost(String notificationServiceHost) {
+ this.notificationServiceHost = notificationServiceHost;
+ }
+
+ public int getNotificationServicePort() {
+ return notificationServicePort;
+ }
+
+ public void setNotificationServicePort(int notificationServicePort) {
+ this.notificationServicePort = notificationServicePort;
+ }
}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java
index d706c16..9b056a5 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java
@@ -17,17 +17,21 @@
package org.apache.airavata.datalake.orchestrator.handlers.async;
+import org.apache.airavata.datalake.data.orchestrator.api.stub.notification.Notification;
+import org.apache.airavata.datalake.data.orchestrator.api.stub.notification.NotificationRegisterRequest;
import org.apache.airavata.datalake.orchestrator.Configuration;
+import org.apache.airavata.dataorchestrator.clients.core.NotificationClient;
import org.apache.airavata.dataorchestrator.messaging.consumer.MessageConsumer;
+import org.dozer.DozerBeanMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.HashSet;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
/**
* Orchestrator event handler
@@ -39,9 +43,9 @@ public class OrchestratorEventHandler {
private Configuration configuration;
private ExecutorService executorService;
- private ScheduledExecutorService ouboundExecutorService;
private MessageConsumer messageConsumer;
private final Set<String> eventCache = new HashSet<>();
+ private NotificationClient notificationClient;
public OrchestratorEventHandler() {
}
@@ -49,11 +53,13 @@ public class OrchestratorEventHandler {
public void init(Configuration configuration) throws Exception {
this.configuration = configuration;
this.executorService = Executors.newFixedThreadPool(configuration.getEventProcessorWorkers());
- this.ouboundExecutorService = Executors.newSingleThreadScheduledExecutor();
messageConsumer = new MessageConsumer(configuration.getConsumer().getBrokerURL(),
configuration.getConsumer().getConsumerGroup(),
configuration.getConsumer().getMaxPollRecordsConfig(),
configuration.getConsumer().getTopic());
+ this.notificationClient = new NotificationClient(
+ configuration.getOutboundEventProcessor().getNotificationServiceHost(),
+ configuration.getOutboundEventProcessor().getNotificationServicePort());
}
public void startProcessing() throws Exception {
@@ -63,8 +69,19 @@ public class OrchestratorEventHandler {
try {
if (!eventCache.contains(notificationEvent.getResourcePath() + ":" + notificationEvent.getHostName())) {
eventCache.add(notificationEvent.getResourcePath() + ":" + notificationEvent.getHostName());
+
+ Notification.Builder notificationBuilder = Notification.newBuilder();
+ DozerBeanMapper mapper = new DozerBeanMapper();
+ mapper.map(notificationEvent, notificationBuilder);
+ notificationBuilder.setNotificationId(UUID.randomUUID().toString());
+
+ Notification notification = notificationBuilder.build();
+ LOGGER.info("Registering notification in the database");
+ this.notificationClient.get().registerNotification(NotificationRegisterRequest
+ .newBuilder().setNotification(notification).build());
+
this.executorService.submit(new OrchestratorEventProcessor(
- configuration, notificationEvent, eventCache));
+ configuration, notification, eventCache, notificationClient));
} else {
LOGGER.info("Event is already processing");
}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
index 53f02b6..8ed7c75 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
@@ -17,6 +17,9 @@
package org.apache.airavata.datalake.orchestrator.handlers.async;
+import org.apache.airavata.datalake.data.orchestrator.api.stub.notification.Notification;
+import org.apache.airavata.datalake.data.orchestrator.api.stub.notification.NotificationStatus;
+import org.apache.airavata.datalake.data.orchestrator.api.stub.notification.NotificationStatusRegisterRequest;
import org.apache.airavata.datalake.drms.resource.GenericResource;
import org.apache.airavata.datalake.drms.storage.AnyStoragePreference;
import org.apache.airavata.datalake.drms.storage.TransferMapping;
@@ -24,7 +27,7 @@ import org.apache.airavata.datalake.orchestrator.Configuration;
import org.apache.airavata.datalake.orchestrator.Utils;
import org.apache.airavata.datalake.orchestrator.connectors.DRMSConnector;
import org.apache.airavata.datalake.orchestrator.connectors.WorkflowServiceConnector;
-import org.apache.airavata.dataorchestrator.messaging.model.NotificationEvent;
+import org.apache.airavata.dataorchestrator.clients.core.NotificationClient;
import org.apache.airavata.mft.api.client.MFTApiClient;
import org.apache.airavata.mft.api.service.DirectoryMetadataResponse;
import org.apache.airavata.mft.api.service.FetchResourceMetadataRequest;
@@ -41,20 +44,22 @@ public class OrchestratorEventProcessor implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(OrchestratorEventProcessor.class);
- private NotificationEvent notificationEvent;
+ private final Notification notification;
- private DRMSConnector drmsConnector;
- private Configuration configuration;
- private WorkflowServiceConnector workflowServiceConnector;
+ private final DRMSConnector drmsConnector;
+ private final Configuration configuration;
+ private final WorkflowServiceConnector workflowServiceConnector;
private final Set<String> eventCache;
+ private final NotificationClient notificationClient;
- public OrchestratorEventProcessor(Configuration configuration, NotificationEvent notificationEvent,
- Set<String> eventCache) throws Exception {
- this.notificationEvent = notificationEvent;
+ public OrchestratorEventProcessor(Configuration configuration, Notification notificationEvent,
+ Set<String> eventCache, NotificationClient notificationClient) throws Exception {
+ this.notification = notificationEvent;
this.eventCache = eventCache;
this.drmsConnector = new DRMSConnector(configuration);
this.workflowServiceConnector = new WorkflowServiceConnector(configuration);
this.configuration = configuration;
+ this.notificationClient = notificationClient;
}
private List<GenericResource> createResourceRecursively(String storageId, String basePath,
@@ -74,8 +79,8 @@ public class OrchestratorEventProcessor implements Runnable {
currentPath = currentPath + "/" + resourceName;
String resourceId = Utils.getId(storageId + ":" + currentPath);
Optional<GenericResource> optionalGenericResource =
- this.drmsConnector.createResource(notificationEvent.getAuthToken(),
- notificationEvent.getTenantId(),
+ this.drmsConnector.createResource(notification.getAuthToken(),
+ notification.getTenantId(),
resourceId, resourceName, currentPath, parentId, "COLLECTION", parentType, user);
if (optionalGenericResource.isPresent()) {
parentId = optionalGenericResource.get().getResourceId();
@@ -90,8 +95,8 @@ public class OrchestratorEventProcessor implements Runnable {
currentPath = currentPath + "/" + splitted[splitted.length - 1];
Optional<GenericResource> optionalGenericResource =
- this.drmsConnector.createResource(notificationEvent.getAuthToken(),
- notificationEvent.getTenantId(),
+ this.drmsConnector.createResource(notification.getAuthToken(),
+ notification.getTenantId(),
Utils.getId(storageId + ":" + currentPath),
splitted[splitted.length - 1], currentPath,
parentId, resourceType, parentType, user);
@@ -111,7 +116,7 @@ public class OrchestratorEventProcessor implements Runnable {
for (GenericResource resource : resourceList) {
logger.info("Sharing resource {} with path {} with user {}",
resource.getResourceId(), resource.getResourcePath(), user);
- this.drmsConnector.shareWithUser(notificationEvent.getAuthToken(), notificationEvent.getTenantId(),
+ this.drmsConnector.shareWithUser(notification.getAuthToken(), notification.getTenantId(),
admin, user, resource.getResourceId(), permission);
}
}
@@ -120,25 +125,34 @@ public class OrchestratorEventProcessor implements Runnable {
for (GenericResource resource : resourceList) {
logger.info("Sharing resource {} with path {} with group {}",
resource.getResourceId(), resource.getResourcePath(), group);
- this.drmsConnector.shareWithGroup(notificationEvent.getAuthToken(), notificationEvent.getTenantId(),
+ this.drmsConnector.shareWithGroup(notification.getAuthToken(), notification.getTenantId(),
admin, group, resource.getResourceId(), permission);
}
}
@Override
public void run() {
- logger.info("Processing resource path {} on storage {}", notificationEvent.getResourcePath(),
- notificationEvent.getBasePath());
+ logger.info("Processing resource path {} on storage {}", notification.getResourcePath(),
+ notification.getBasePath());
try {
- if (!"FOLDER".equals(notificationEvent.getResourceType())) {
+ this.notificationClient.get().registerNotificationStatus(NotificationStatusRegisterRequest.newBuilder()
+ .setStatus(NotificationStatus.newBuilder()
+ .setStatusId(UUID.randomUUID().toString())
+ .setNotificationId(notification.getNotificationId())
+ .setStatus(NotificationStatus.StatusType.DATA_ORCH_RECEIVED)
+ .setDescription("Notification Received")
+ .setPublishedTime(System.currentTimeMillis())
+ .build()).build());
+
+ if (!"FOLDER".equals(notification.getResourceType())) {
logger.error("Resource {} should be a Folder type but got {}",
- notificationEvent.getResourcePath(),
- notificationEvent.getResourceType());
+ notification.getResourcePath(),
+ notification.getResourceType());
logger.error("Resource should be a Folder type");
}
- String removeBasePath = notificationEvent.getResourcePath().substring(notificationEvent.getBasePath().length());
+ String removeBasePath = notification.getResourcePath().substring(notification.getBasePath().length());
String[] splitted = removeBasePath.split("/");
String adminUser = splitted[0];
@@ -149,12 +163,12 @@ public class OrchestratorEventProcessor implements Runnable {
ownerRules.put(splitted[1], "OWNER");
Optional<TransferMapping> optionalTransferMapping = drmsConnector.getActiveTransferMapping(
- notificationEvent.getAuthToken(),
- notificationEvent.getTenantId(), adminUser,
- notificationEvent.getHostName());
+ notification.getAuthToken(),
+ notification.getTenantId(), adminUser,
+ notification.getHostName());
if (optionalTransferMapping.isEmpty()) {
- logger.error("Could not find a transfer mapping for user {} and host {}", adminUser, notificationEvent.getHostName());
+ logger.error("Could not find a transfer mapping for user {} and host {}", adminUser, notification.getHostName());
throw new Exception("Could not find a transfer mapping");
}
@@ -166,8 +180,8 @@ public class OrchestratorEventProcessor implements Runnable {
// Creating parent resource
List<GenericResource> resourceList = createResourceRecursively(sourceStorageId,
- notificationEvent.getBasePath(),
- notificationEvent.getResourcePath(),
+ notification.getBasePath(),
+ notification.getResourcePath(),
"COLLECTION", adminUser);
shareResourcesWithUsers(Collections.singletonList(resourceList.get(resourceList.size() - 1)),
@@ -184,8 +198,8 @@ public class OrchestratorEventProcessor implements Runnable {
GenericResource resourceObj = resourceList.get(resourceList.size() - 1);
Optional<AnyStoragePreference> sourceSPOp = this.drmsConnector.getStoragePreference(
- notificationEvent.getAuthToken(), adminUser,
- notificationEvent.getTenantId(), sourceStorageId);
+ notification.getAuthToken(), adminUser,
+ notification.getTenantId(), sourceStorageId);
if (sourceSPOp.isEmpty()) {
logger.error("No storage preference found for source storage {} and user {}", sourceStorageId, adminUser);
@@ -193,8 +207,8 @@ public class OrchestratorEventProcessor implements Runnable {
}
Optional<AnyStoragePreference> destSPOp = this.drmsConnector.getStoragePreference(
- notificationEvent.getAuthToken(), adminUser,
- notificationEvent.getTenantId(), destinationStorageId);
+ notification.getAuthToken(), adminUser,
+ notification.getTenantId(), destinationStorageId);
if (destSPOp.isEmpty()) {
logger.error("No storage preference found for destination storage {} and user {}", sourceStorageId, adminUser);
@@ -204,7 +218,7 @@ public class OrchestratorEventProcessor implements Runnable {
AnyStoragePreference sourceSP = sourceSPOp.get();
AnyStoragePreference destSP = destSPOp.get();
- String decodedAuth = new String(Base64.getDecoder().decode(notificationEvent.getAuthToken()));
+ String decodedAuth = new String(Base64.getDecoder().decode(notification.getAuthToken()));
String[] authParts = decodedAuth.split(":");
if (authParts.length != 2) {
@@ -215,7 +229,7 @@ public class OrchestratorEventProcessor implements Runnable {
.setUserId(adminUser)
.setClientId(authParts[0])
.setClientSecret(authParts[1])
- .putProperties("TENANT_ID", notificationEvent.getTenantId()).build();
+ .putProperties("TENANT_ID", notification.getTenantId()).build();
AuthToken mftAuth = AuthToken.newBuilder().setDelegateAuth(delegateAuth).build();
@@ -248,7 +262,7 @@ public class OrchestratorEventProcessor implements Runnable {
List<String> resourceIDsToProcess = new ArrayList<>();
for (FileMetadataResponse fileMetadata : directoryResourceMetadata.getFilesList()) {
logger.info("Registering file {} for source storage {}", fileMetadata.getResourcePath(), sourceStorageId);
- resourceList = createResourceRecursively(sourceStorageId, notificationEvent.getBasePath(),
+ resourceList = createResourceRecursively(sourceStorageId, notification.getBasePath(),
fileMetadata.getResourcePath(), "FILE", adminUser);
GenericResource fileResource = resourceList.get(resourceList.size() - 1);
@@ -257,32 +271,48 @@ public class OrchestratorEventProcessor implements Runnable {
for (DirectoryMetadataResponse directoryMetadata : directoryResourceMetadata.getDirectoriesList()) {
logger.info("Registering directory {} for source storage {}", directoryMetadata.getResourcePath(), sourceStorageId);
- createResourceRecursively(sourceStorageId, notificationEvent.getBasePath(),
+ createResourceRecursively(sourceStorageId, notification.getBasePath(),
directoryMetadata.getResourcePath(),
"COLLECTION", adminUser);
// TODO scan directories
}
- logger.info("Creating destination zip resource for directory {}", notificationEvent.getResourcePath());
- resourceList = createResourceRecursively(destinationStorageId, notificationEvent.getBasePath(),
- notificationEvent.getResourcePath(), "FILE", adminUser);
+ logger.info("Creating destination zip resource for directory {}", notification.getResourcePath());
+ resourceList = createResourceRecursively(destinationStorageId, notification.getBasePath(),
+ notification.getResourcePath(), "FILE", adminUser);
GenericResource destinationResource = resourceList.get(resourceList.size() - 1);
- System.out.println(destinationResource);
-
logger.info("Submitting resources to workflow manager");
- this.workflowServiceConnector.invokeWorkflow(notificationEvent.getAuthToken(), adminUser,
- notificationEvent.getTenantId(), resourceIDsToProcess, sourceSP.getSshStoragePreference().getStoragePreferenceId(),
+ this.workflowServiceConnector.invokeWorkflow(notification.getAuthToken(), adminUser,
+ notification.getTenantId(), resourceIDsToProcess, sourceSP.getSshStoragePreference().getStoragePreferenceId(),
destinationResource.getResourceId(), destSP.getSshStoragePreference().getStoragePreferenceId());
- logger.info("Completed processing path {}", notificationEvent.getResourcePath());
+ this.notificationClient.get().registerNotificationStatus(NotificationStatusRegisterRequest.newBuilder()
+ .setStatus(NotificationStatus.newBuilder()
+ .setStatusId(UUID.randomUUID().toString())
+ .setNotificationId(notification.getNotificationId())
+ .setStatus(NotificationStatus.StatusType.DISPATCHED_TO_WORFLOW_ENGING)
+ .setDescription("Notification successfully processed at the orchestrator. " +
+ "Sending to workflow manager")
+ .setPublishedTime(System.currentTimeMillis())
+ .build()).build());
+
+ logger.info("Completed processing path {}", notification.getResourcePath());
} catch (Exception e) {
- logger.error("Failed to process event for resource path {}", notificationEvent.getResourcePath(), e);
+ logger.error("Failed to process event for resource path {}", notification.getResourcePath(), e);
+ this.notificationClient.get().registerNotificationStatus(NotificationStatusRegisterRequest.newBuilder()
+ .setStatus(NotificationStatus.newBuilder()
+ .setStatusId(UUID.randomUUID().toString())
+ .setNotificationId(notification.getNotificationId())
+ .setStatus(NotificationStatus.StatusType.ERRORED)
+ .setDescription("Notification failed due to : " + e.getMessage())
+ .setPublishedTime(System.currentTimeMillis())
+ .build()).build());
} finally {
- this.eventCache.remove(notificationEvent.getResourcePath() + ":" + notificationEvent.getHostName());
+ this.eventCache.remove(notification.getResourcePath() + ":" + notification.getHostName());
}
}
}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/grpc/NotificationApiHandler.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/grpc/NotificationApiHandler.java
new file mode 100644
index 0000000..d44d81d
--- /dev/null
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/grpc/NotificationApiHandler.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.handlers.grpc;
+
+import io.grpc.stub.StreamObserver;
+import org.apache.airavata.datalake.data.orchestrator.api.stub.notification.*;
+import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.notification.NotificationEntity;
+import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.notification.NotificationStatusEntity;
+import org.apache.airavata.datalake.orchestrator.registry.persistance.repository.NotificationEntityRepository;
+import org.apache.airavata.datalake.orchestrator.registry.persistance.repository.NotificationStatusEntityRepository;
+import org.dozer.DozerBeanMapper;
+import org.lognet.springboot.grpc.GRpcService;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Stream;
+
+@GRpcService
+public class NotificationApiHandler extends NotificationServiceGrpc.NotificationServiceImplBase {
+
+ @Autowired
+ private NotificationEntityRepository notificationRepository;
+
+ @Autowired
+ private NotificationStatusEntityRepository notificationStatusRepository;
+
+ @Override
+ public void registerNotification(NotificationRegisterRequest request, StreamObserver<NotificationRegisterResponse> responseObserver) {
+ DozerBeanMapper mapper = new DozerBeanMapper();
+ NotificationEntity notificationEntity = mapper.map(request.getNotification(), NotificationEntity.class);
+ notificationRepository.save(notificationEntity);
+ responseObserver.onNext(NotificationRegisterResponse.newBuilder().build());
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void listNotifications(NotificationListRequest request, StreamObserver<NotificationListResponse> responseObserver) {
+ List<NotificationEntity> allEntities = notificationRepository.findAll();
+ DozerBeanMapper mapper = new DozerBeanMapper();
+ NotificationListResponse.Builder responseBuilder = NotificationListResponse.newBuilder();
+ for (NotificationEntity e : allEntities) {
+ Notification.Builder builder = Notification.newBuilder();
+ mapper.map(e, builder);
+ responseBuilder.addNotifications(builder.build());
+ }
+ responseObserver.onNext(responseBuilder.build());
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void registerNotificationStatus(NotificationStatusRegisterRequest request, StreamObserver<NotificationStatusRegisterResponse> responseObserver) {
+ DozerBeanMapper mapper = new DozerBeanMapper();
+ NotificationStatusEntity entity = mapper.map(request.getStatus(), NotificationStatusEntity.class);
+ notificationStatusRepository.save(entity);
+ responseObserver.onNext(NotificationStatusRegisterResponse.newBuilder().build());
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void listNotificationStatus(NotificationStatusListRequest request, StreamObserver<NotificationStatusListResponse> responseObserver) {
+ List<NotificationStatusEntity> allEntities = notificationStatusRepository.findAll();
+ DozerBeanMapper mapper = new DozerBeanMapper();
+ NotificationStatusListResponse.Builder responseBuilder = NotificationStatusListResponse.newBuilder();
+ for (NotificationStatusEntity e : allEntities) {
+ NotificationStatus.Builder builder = NotificationStatus.newBuilder();
+ mapper.map(e, builder);
+ responseBuilder.addStatuses(builder.build());
+ }
+ responseObserver.onNext(responseBuilder.build());
+ responseObserver.onCompleted();
+ }
+}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/notification.proto b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/notification.proto
new file mode 100644
index 0000000..141bfb4
--- /dev/null
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/notification.proto
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+option java_multiple_files = true;
+package org.apache.airavata.datalake.data.orchestrator.api.stub.notification;
+
+import "google/api/annotations.proto";
+import "google/protobuf/empty.proto";
+
+message Notification {
+ string notificationId = 1;
+ string resourcePath = 2;
+ string resourceType = 3;
+ int64 occuredTime = 4;
+ string authToken = 5;
+ string tenantId = 6;
+ string hostName = 7;
+ string basePath = 8;
+ string eventType = 9;
+}
+
+message NotificationStatus {
+ string statusId = 1;
+ enum StatusType {
+ DATA_ORCH_RECEIVED = 0;
+ DISPATCHED_TO_WORFLOW_ENGING = 1;
+ DATA_ORCH_PROCESSED_AND_SKIPPED = 2;
+ PARSING = 3;
+ COMPLETED = 4;
+ ERRORED = 5;
+ }
+ StatusType status = 2;
+ int64 publishedTime = 3;
+ string notificationId = 4;
+ string description = 5;
+
+}
+
+message NotificationRegisterRequest {
+ Notification notification = 1;
+}
+
+message NotificationRegisterResponse {
+}
+
+message NotificationListRequest {
+}
+
+message NotificationListResponse {
+ repeated Notification notifications = 1;
+}
+
+message NotificationStatusRegisterRequest {
+ NotificationStatus status = 1;
+}
+
+message NotificationStatusRegisterResponse {
+}
+
+message NotificationStatusListRequest {
+}
+
+message NotificationStatusListResponse {
+ repeated NotificationStatus statuses = 1;
+}
+
+service NotificationService {
+
+ rpc registerNotification (NotificationRegisterRequest) returns (NotificationRegisterResponse) {
+ }
+
+ rpc listNotifications (NotificationListRequest) returns (NotificationListResponse) {
+ }
+
+ rpc registerNotificationStatus (NotificationStatusRegisterRequest) returns (NotificationStatusRegisterResponse) {
+ }
+
+ rpc listNotificationStatus(NotificationStatusListRequest) returns (NotificationStatusListResponse) {
+ }
+}
\ No newline at end of file