You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ad...@apache.org on 2017/05/09 16:40:59 UTC

[3/7] airavata git commit: Adding common messaging module for workers

Adding common messaging module for workers


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/089ef37b
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/089ef37b
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/089ef37b

Branch: refs/heads/feature-workload-mgmt
Commit: 089ef37b31db8de2964b5913b5e8e58fcf610aec
Parents: 46cdeb3
Author: Ajinkya Dhamnaskar <ad...@apache.org>
Authored: Tue May 9 12:36:49 2017 -0400
Committer: Ajinkya Dhamnaskar <ad...@apache.org>
Committed: Tue May 9 12:36:49 2017 -0400

----------------------------------------------------------------------
 .../worker/task/envsetup/messaging/sample       |  1 -
 modules/worker/worker-messaging/pom.xml         | 41 +++++++++++++++++
 .../messaging/handler/WorkerMessageHandler.java | 48 ++++++++++++++++++++
 .../worker/messaging/impl/WorkerEngineImpl.java | 22 +++++++++
 .../messaging/runner/WorkerMessagingRunner.java | 11 +++++
 .../messaging/utils/WorkerMessagingFactory.java | 43 ++++++++++++++++++
 .../src/main/resources/worker-config.yaml       | 32 +++++++++++++
 .../src/test/java/TestWorkerMessaging.java      | 28 ++++++++++++
 8 files changed, 225 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/089ef37b/modules/worker/task-envsetup/src/main/java/org/apache/airavata/worker/task/envsetup/messaging/sample
----------------------------------------------------------------------
diff --git a/modules/worker/task-envsetup/src/main/java/org/apache/airavata/worker/task/envsetup/messaging/sample b/modules/worker/task-envsetup/src/main/java/org/apache/airavata/worker/task/envsetup/messaging/sample
deleted file mode 100644
index f6c4fd0..0000000
--- a/modules/worker/task-envsetup/src/main/java/org/apache/airavata/worker/task/envsetup/messaging/sample
+++ /dev/null
@@ -1 +0,0 @@
-delete me
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/089ef37b/modules/worker/worker-messaging/pom.xml
----------------------------------------------------------------------
diff --git a/modules/worker/worker-messaging/pom.xml b/modules/worker/worker-messaging/pom.xml
new file mode 100644
index 0000000..49793a3
--- /dev/null
+++ b/modules/worker/worker-messaging/pom.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.airavata</groupId>
+        <artifactId>airavata-worker</artifactId>
+        <version>0.17-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>airavata-worker-messaging</artifactId>
+    <name>Airavata Task Messaging</name>
+    <description>Handling Task messaging</description>
+    <url>http://airavata.apache.org/</url>
+
+    <properties>
+        <testng.version>6.1.1</testng.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-messaging-core</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-worker-core</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+            <version>${testng.version}</version>
+        </dependency>
+    </dependencies>
+
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/089ef37b/modules/worker/worker-messaging/src/main/java/org/apache/airavata/worker/messaging/handler/WorkerMessageHandler.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-messaging/src/main/java/org/apache/airavata/worker/messaging/handler/WorkerMessageHandler.java b/modules/worker/worker-messaging/src/main/java/org/apache/airavata/worker/messaging/handler/WorkerMessageHandler.java
new file mode 100644
index 0000000..49fa609
--- /dev/null
+++ b/modules/worker/worker-messaging/src/main/java/org/apache/airavata/worker/messaging/handler/WorkerMessageHandler.java
@@ -0,0 +1,48 @@
+package org.apache.airavata.worker.messaging.handler;
+
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.worker.core.context.TaskContext;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by Ajinkya on 5/2/17.
+ */
+public class WorkerMessageHandler implements MessageHandler {
+
+    private final static Logger log = LoggerFactory.getLogger(WorkerMessageHandler.class);
+
+    @Override
+    public void onMessage(MessageContext messageContext) {
+        log.info("Worker received message. Message Id : " +  messageContext.getMessageId());
+        try {
+            TBase messageEvent = messageContext.getEvent();
+            byte[] bytes = new byte[0];
+            bytes = ThriftUtils.serializeThriftObject(messageEvent);
+            //ThriftUtils.createThriftFromBytes(bytes, event);
+            TaskContext taskContext = new TaskContext();
+
+            switch (taskContext.getTaskType()){
+
+                case ENV_SETUP: //TODO: call environment setup
+                    break;
+
+                case DATA_STAGING: //TODO: call data staging
+                    break;
+
+                case JOB_SUBMISSION: //TODO: call job submission
+                    break;
+
+                case MONITORING: //TODO: call monitoring
+                    break;
+            }
+
+        } catch (TException e) {
+            log.error("Error processing messaging. Message Id : " +  messageContext.getMessageId());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/089ef37b/modules/worker/worker-messaging/src/main/java/org/apache/airavata/worker/messaging/impl/WorkerEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-messaging/src/main/java/org/apache/airavata/worker/messaging/impl/WorkerEngineImpl.java b/modules/worker/worker-messaging/src/main/java/org/apache/airavata/worker/messaging/impl/WorkerEngineImpl.java
new file mode 100644
index 0000000..441053d
--- /dev/null
+++ b/modules/worker/worker-messaging/src/main/java/org/apache/airavata/worker/messaging/impl/WorkerEngineImpl.java
@@ -0,0 +1,22 @@
+package org.apache.airavata.worker.messaging.impl;
+
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.worker.core.context.TaskContext;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+
+/**
+ * Created by Ajinkya on 5/2/17.
+ */
+public class WorkerEngineImpl {
+
+//    private TaskStatus executeTask(TaskContext taskCtx, Task task, boolean recover) throws WorkerException {
+//        TaskStatus taskStatus = null;
+//        if (recover) {
+//            taskStatus = task.recover(taskCtx);
+//        } else {
+//            taskStatus = task.execute(taskCtx);
+//        }
+//        return taskStatus;
+//    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/089ef37b/modules/worker/worker-messaging/src/main/java/org/apache/airavata/worker/messaging/runner/WorkerMessagingRunner.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-messaging/src/main/java/org/apache/airavata/worker/messaging/runner/WorkerMessagingRunner.java b/modules/worker/worker-messaging/src/main/java/org/apache/airavata/worker/messaging/runner/WorkerMessagingRunner.java
new file mode 100644
index 0000000..76c9528
--- /dev/null
+++ b/modules/worker/worker-messaging/src/main/java/org/apache/airavata/worker/messaging/runner/WorkerMessagingRunner.java
@@ -0,0 +1,11 @@
+package org.apache.airavata.worker.messaging.runner;
+
+import java.util.List;
+
+/**
+ * Created by Ajinkya on 5/2/17.
+ */
+public class WorkerMessagingRunner {
+
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/089ef37b/modules/worker/worker-messaging/src/main/java/org/apache/airavata/worker/messaging/utils/WorkerMessagingFactory.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-messaging/src/main/java/org/apache/airavata/worker/messaging/utils/WorkerMessagingFactory.java b/modules/worker/worker-messaging/src/main/java/org/apache/airavata/worker/messaging/utils/WorkerMessagingFactory.java
new file mode 100644
index 0000000..d33bc8b
--- /dev/null
+++ b/modules/worker/worker-messaging/src/main/java/org/apache/airavata/worker/messaging/utils/WorkerMessagingFactory.java
@@ -0,0 +1,43 @@
+package org.apache.airavata.worker.messaging.utils;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.messaging.core.MessagingFactory;
+import org.apache.airavata.messaging.core.Subscriber;
+import org.apache.airavata.worker.messaging.handler.WorkerMessageHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by Ajinkya on 5/2/17.
+ */
+public class WorkerMessagingFactory {
+
+    private final static Logger log = LoggerFactory.getLogger(WorkerMessagingFactory.class);
+    private final static Map<String, Subscriber> SUBSCRIBER_MAP;
+
+    static{
+        SUBSCRIBER_MAP = new HashMap<>();
+    }
+
+    public static Map<String, Subscriber> getSubscriberMap(){
+        return SUBSCRIBER_MAP;
+    }
+
+    public static final boolean createSubscribers(List<String> tasks) throws AiravataException {
+
+        log.info("Creating RabbitMQ subscribers for : " + tasks.toString());
+        for(String task : tasks){
+
+            log.info("Starting subscriber for task : " + task);
+            //adding subscriber to map
+            SUBSCRIBER_MAP.put(task, MessagingFactory.getWorkerEventSubscriber(new WorkerMessageHandler(), task));
+
+            log.debug("Subscriber started for task : " + task);
+        }
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/089ef37b/modules/worker/worker-messaging/src/main/resources/worker-config.yaml
----------------------------------------------------------------------
diff --git a/modules/worker/worker-messaging/src/main/resources/worker-config.yaml b/modules/worker/worker-messaging/src/main/resources/worker-config.yaml
new file mode 100644
index 0000000..87b8310
--- /dev/null
+++ b/modules/worker/worker-messaging/src/main/resources/worker-config.yaml
@@ -0,0 +1,32 @@
+##################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+################################################################
+
+taskImplementations:
+  - task: ENV_SETUP
+    taskClass: org.apache.airavata.worker.task.envsetup.impl.EnvironmentSetupTask
+
+  - task: DATA_STAGING
+      taskClass: org.apache.airavata.worker.task.datastaging.impl.DataStageTask
+
+  - task: JOB_SUBMISSION
+      taskClass: org.apache.airavata.worker.task.jobsubmission.impl.DefaultJobSubmissionTask
+
+  - task: MONITORING
+      taskClass: org.apache.airavata.worker.task.envsetup.impl.EnvironmentSetupTask
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/089ef37b/modules/worker/worker-messaging/src/test/java/TestWorkerMessaging.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-messaging/src/test/java/TestWorkerMessaging.java b/modules/worker/worker-messaging/src/test/java/TestWorkerMessaging.java
new file mode 100644
index 0000000..b353ce7
--- /dev/null
+++ b/modules/worker/worker-messaging/src/test/java/TestWorkerMessaging.java
@@ -0,0 +1,28 @@
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.worker.messaging.utils.WorkerMessagingFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Created by Ajinkya on 5/3/17.
+ */
+public class TestWorkerMessaging {
+
+    @Test
+    public void testSubscriberCreation(){
+        List<String> tasks = new ArrayList<String>(){{add(TaskTypes.ENV_SETUP.toString());add(TaskTypes.DATA_STAGING.toString());}};
+        try {
+
+            WorkerMessagingFactory.createSubscribers(tasks);
+            Assert.assertEquals(WorkerMessagingFactory.getSubscriberMap().size(), tasks.size());
+
+        } catch (AiravataException e) {
+            Assert.fail("Fail to start subscribers.", e);
+        }
+    }
+
+}