You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ad...@apache.org on 2017/05/09 16:40:59 UTC
[3/7] airavata git commit: Adding common messaging module for workers
Adding common messaging module for workers
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/089ef37b
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/089ef37b
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/089ef37b
Branch: refs/heads/feature-workload-mgmt
Commit: 089ef37b31db8de2964b5913b5e8e58fcf610aec
Parents: 46cdeb3
Author: Ajinkya Dhamnaskar <ad...@apache.org>
Authored: Tue May 9 12:36:49 2017 -0400
Committer: Ajinkya Dhamnaskar <ad...@apache.org>
Committed: Tue May 9 12:36:49 2017 -0400
----------------------------------------------------------------------
.../worker/task/envsetup/messaging/sample | 1 -
modules/worker/worker-messaging/pom.xml | 41 +++++++++++++++++
.../messaging/handler/WorkerMessageHandler.java | 48 ++++++++++++++++++++
.../worker/messaging/impl/WorkerEngineImpl.java | 22 +++++++++
.../messaging/runner/WorkerMessagingRunner.java | 11 +++++
.../messaging/utils/WorkerMessagingFactory.java | 43 ++++++++++++++++++
.../src/main/resources/worker-config.yaml | 32 +++++++++++++
.../src/test/java/TestWorkerMessaging.java | 28 ++++++++++++
8 files changed, 225 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/089ef37b/modules/worker/task-envsetup/src/main/java/org/apache/airavata/worker/task/envsetup/messaging/sample
----------------------------------------------------------------------
diff --git a/modules/worker/task-envsetup/src/main/java/org/apache/airavata/worker/task/envsetup/messaging/sample b/modules/worker/task-envsetup/src/main/java/org/apache/airavata/worker/task/envsetup/messaging/sample
deleted file mode 100644
index f6c4fd0..0000000
--- a/modules/worker/task-envsetup/src/main/java/org/apache/airavata/worker/task/envsetup/messaging/sample
+++ /dev/null
@@ -1 +0,0 @@
-delete me
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/089ef37b/modules/worker/worker-messaging/pom.xml
----------------------------------------------------------------------
diff --git a/modules/worker/worker-messaging/pom.xml b/modules/worker/worker-messaging/pom.xml
new file mode 100644
index 0000000..49793a3
--- /dev/null
+++ b/modules/worker/worker-messaging/pom.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-worker</artifactId>
+ <version>0.17-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>airavata-worker-messaging</artifactId>
+ <name>Airavata Task Messaging</name>
+ <description>Handling Task messaging</description>
+ <url>http://airavata.apache.org/</url>
+
+ <properties>
+ <testng.version>6.1.1</testng.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-messaging-core</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-worker-core</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <version>${testng.version}</version>
+ </dependency>
+ </dependencies>
+
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/089ef37b/modules/worker/worker-messaging/src/main/java/org/apache/airavata/worker/messaging/handler/WorkerMessageHandler.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-messaging/src/main/java/org/apache/airavata/worker/messaging/handler/WorkerMessageHandler.java b/modules/worker/worker-messaging/src/main/java/org/apache/airavata/worker/messaging/handler/WorkerMessageHandler.java
new file mode 100644
index 0000000..49fa609
--- /dev/null
+++ b/modules/worker/worker-messaging/src/main/java/org/apache/airavata/worker/messaging/handler/WorkerMessageHandler.java
@@ -0,0 +1,48 @@
+package org.apache.airavata.worker.messaging.handler;
+
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.worker.core.context.TaskContext;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by Ajinkya on 5/2/17.
+ */
+public class WorkerMessageHandler implements MessageHandler {
+
+ private final static Logger log = LoggerFactory.getLogger(WorkerMessageHandler.class);
+
+ @Override
+ public void onMessage(MessageContext messageContext) {
+ log.info("Worker received message. Message Id : " + messageContext.getMessageId());
+ try {
+ TBase messageEvent = messageContext.getEvent();
+ byte[] bytes = new byte[0];
+ bytes = ThriftUtils.serializeThriftObject(messageEvent);
+ //ThriftUtils.createThriftFromBytes(bytes, event);
+ TaskContext taskContext = new TaskContext();
+
+ switch (taskContext.getTaskType()){
+
+ case ENV_SETUP: //TODO: call environment setup
+ break;
+
+ case DATA_STAGING: //TODO: call data staging
+ break;
+
+ case JOB_SUBMISSION: //TODO: call job submission
+ break;
+
+ case MONITORING: //TODO: call monitoring
+ break;
+ }
+
+ } catch (TException e) {
+ log.error("Error processing messaging. Message Id : " + messageContext.getMessageId());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/089ef37b/modules/worker/worker-messaging/src/main/java/org/apache/airavata/worker/messaging/impl/WorkerEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-messaging/src/main/java/org/apache/airavata/worker/messaging/impl/WorkerEngineImpl.java b/modules/worker/worker-messaging/src/main/java/org/apache/airavata/worker/messaging/impl/WorkerEngineImpl.java
new file mode 100644
index 0000000..441053d
--- /dev/null
+++ b/modules/worker/worker-messaging/src/main/java/org/apache/airavata/worker/messaging/impl/WorkerEngineImpl.java
@@ -0,0 +1,22 @@
+package org.apache.airavata.worker.messaging.impl;
+
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.worker.core.context.TaskContext;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+
+/**
+ * Created by Ajinkya on 5/2/17.
+ */
+public class WorkerEngineImpl {
+
+// private TaskStatus executeTask(TaskContext taskCtx, Task task, boolean recover) throws WorkerException {
+// TaskStatus taskStatus = null;
+// if (recover) {
+// taskStatus = task.recover(taskCtx);
+// } else {
+// taskStatus = task.execute(taskCtx);
+// }
+// return taskStatus;
+// }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/089ef37b/modules/worker/worker-messaging/src/main/java/org/apache/airavata/worker/messaging/runner/WorkerMessagingRunner.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-messaging/src/main/java/org/apache/airavata/worker/messaging/runner/WorkerMessagingRunner.java b/modules/worker/worker-messaging/src/main/java/org/apache/airavata/worker/messaging/runner/WorkerMessagingRunner.java
new file mode 100644
index 0000000..76c9528
--- /dev/null
+++ b/modules/worker/worker-messaging/src/main/java/org/apache/airavata/worker/messaging/runner/WorkerMessagingRunner.java
@@ -0,0 +1,11 @@
+package org.apache.airavata.worker.messaging.runner;
+
+import java.util.List;
+
+/**
+ * Created by Ajinkya on 5/2/17.
+ */
+public class WorkerMessagingRunner {
+
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/089ef37b/modules/worker/worker-messaging/src/main/java/org/apache/airavata/worker/messaging/utils/WorkerMessagingFactory.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-messaging/src/main/java/org/apache/airavata/worker/messaging/utils/WorkerMessagingFactory.java b/modules/worker/worker-messaging/src/main/java/org/apache/airavata/worker/messaging/utils/WorkerMessagingFactory.java
new file mode 100644
index 0000000..d33bc8b
--- /dev/null
+++ b/modules/worker/worker-messaging/src/main/java/org/apache/airavata/worker/messaging/utils/WorkerMessagingFactory.java
@@ -0,0 +1,43 @@
+package org.apache.airavata.worker.messaging.utils;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.messaging.core.MessagingFactory;
+import org.apache.airavata.messaging.core.Subscriber;
+import org.apache.airavata.worker.messaging.handler.WorkerMessageHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by Ajinkya on 5/2/17.
+ */
+public class WorkerMessagingFactory {
+
+ private final static Logger log = LoggerFactory.getLogger(WorkerMessagingFactory.class);
+ private final static Map<String, Subscriber> SUBSCRIBER_MAP;
+
+ static{
+ SUBSCRIBER_MAP = new HashMap<>();
+ }
+
+ public static Map<String, Subscriber> getSubscriberMap(){
+ return SUBSCRIBER_MAP;
+ }
+
+ public static final boolean createSubscribers(List<String> tasks) throws AiravataException {
+
+ log.info("Creating RabbitMQ subscribers for : " + tasks.toString());
+ for(String task : tasks){
+
+ log.info("Starting subscriber for task : " + task);
+ //adding subscriber to map
+ SUBSCRIBER_MAP.put(task, MessagingFactory.getWorkerEventSubscriber(new WorkerMessageHandler(), task));
+
+ log.debug("Subscriber started for task : " + task);
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/089ef37b/modules/worker/worker-messaging/src/main/resources/worker-config.yaml
----------------------------------------------------------------------
diff --git a/modules/worker/worker-messaging/src/main/resources/worker-config.yaml b/modules/worker/worker-messaging/src/main/resources/worker-config.yaml
new file mode 100644
index 0000000..87b8310
--- /dev/null
+++ b/modules/worker/worker-messaging/src/main/resources/worker-config.yaml
@@ -0,0 +1,32 @@
+##################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+################################################################
+
+taskImplementations:
+ - task: ENV_SETUP
+ taskClass: org.apache.airavata.worker.task.envsetup.impl.EnvironmentSetupTask
+
+ - task: DATA_STAGING
+ taskClass: org.apache.airavata.worker.task.datastaging.impl.DataStageTask
+
+ - task: JOB_SUBMISSION
+ taskClass: org.apache.airavata.worker.task.jobsubmission.impl.DefaultJobSubmissionTask
+
+ - task: MONITORING
+ taskClass: org.apache.airavata.worker.task.envsetup.impl.EnvironmentSetupTask
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/089ef37b/modules/worker/worker-messaging/src/test/java/TestWorkerMessaging.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-messaging/src/test/java/TestWorkerMessaging.java b/modules/worker/worker-messaging/src/test/java/TestWorkerMessaging.java
new file mode 100644
index 0000000..b353ce7
--- /dev/null
+++ b/modules/worker/worker-messaging/src/test/java/TestWorkerMessaging.java
@@ -0,0 +1,28 @@
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.worker.messaging.utils.WorkerMessagingFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Created by Ajinkya on 5/3/17.
+ */
+public class TestWorkerMessaging {
+
+ @Test
+ public void testSubscriberCreation(){
+ List<String> tasks = new ArrayList<String>(){{add(TaskTypes.ENV_SETUP.toString());add(TaskTypes.DATA_STAGING.toString());}};
+ try {
+
+ WorkerMessagingFactory.createSubscribers(tasks);
+ Assert.assertEquals(WorkerMessagingFactory.getSubscriberMap().size(), tasks.size());
+
+ } catch (AiravataException e) {
+ Assert.fail("Fail to start subscribers.", e);
+ }
+ }
+
+}