You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/03/26 18:08:17 UTC

[18/50] [abbrv] airavata git commit: Merged queue-gfac-rabbitmq

Merged queue-gfac-rabbitmq


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/249b4401
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/249b4401
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/249b4401

Branch: refs/heads/master
Commit: 249b4401f0bfc821482dfbcb1d02eaf6832b9da1
Parents: 41ea946 840e627
Author: shamrath <sh...@gmail.com>
Authored: Tue Feb 24 10:37:29 2015 -0500
Committer: shamrath <sh...@gmail.com>
Committed: Tue Feb 24 10:37:29 2015 -0500

----------------------------------------------------------------------
 .../server/handler/AiravataServerHandler.java   |   2 +-
 .../lib/airavata/messagingEvents_types.cpp      | 202 +++++-
 .../lib/airavata/messagingEvents_types.h        | 106 ++-
 .../Airavata/Model/Messaging/Event/Types.php    | 228 +++++++
 .../client/samples/CreateLaunchExperiment.java  |  19 +-
 .../model/messaging/event/MessageType.java      |   8 +-
 .../model/messaging/event/TaskSubmitEvent.java  | 684 +++++++++++++++++++
 .../messaging/event/TaskTerminateEvent.java     | 492 +++++++++++++
 airavata-api/generate-thrift-files.sh           |  22 +-
 .../messagingEvents.thrift                      |  16 +-
 .../airavata/common/utils/AiravataZKUtils.java  |  22 +
 .../airavata/common/utils/ServerSettings.java   |  24 +-
 .../main/resources/airavata-server.properties   |  11 +-
 .../main/resources/airavata-server.properties   |   9 +-
 modules/gfac/airavata-gfac-service/pom.xml      |  10 +
 .../airavata/gfac/server/GfacServerHandler.java | 122 +++-
 modules/gfac/gfac-core/pom.xml                  |   1 +
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  |   2 +-
 .../airavata/gfac/core/utils/GFacUtils.java     | 116 +++-
 .../messaging/core/MessagingConstants.java      |   3 +-
 .../messaging/core/PublisherFactory.java        |  23 +-
 .../airavata/messaging/core/TestClient.java     |   5 +-
 .../messaging/core/impl/RabbitMQConsumer.java   | 258 -------
 .../messaging/core/impl/RabbitMQProducer.java   |  27 +-
 .../messaging/core/impl/RabbitMQPublisher.java  | 103 ---
 .../core/impl/RabbitMQStatusConsumer.java       | 274 ++++++++
 .../core/impl/RabbitMQStatusPublisher.java      | 103 +++
 .../core/impl/RabbitMQTaskLaunchConsumer.java   | 244 +++++++
 .../core/impl/RabbitMQTaskLaunchPublisher.java  |  85 +++
 .../server/OrchestratorServerHandler.java       |   3 +-
 modules/orchestrator/orchestrator-core/pom.xml  |   4 +-
 .../core/context/OrchestratorContext.java       |  11 +
 .../core/impl/GFACPassiveJobSubmitter.java      | 232 +++++++
 .../core/impl/GFACRPCJobSubmitter.java          | 212 ++++++
 .../core/impl/GFACServiceJobSubmitter.java      | 212 ------
 .../core/utils/OrchestratorConstants.java       |   1 -
 .../workflow/engine/WorkflowEngineImpl.java     |   4 +-
 .../airavata/xbaya/messaging/Monitor.java       |   5 +-
 pom.xml                                         |   7 +
 39 files changed, 3255 insertions(+), 657 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/249b4401/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --cc airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index 7fbb18b,1e9d983..dd8686d
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@@ -57,10 -60,10 +57,10 @@@ public class CreateLaunchExperiment 
      private static final String DEFAULT_GATEWAY = "default.registry.gateway";
      private static Airavata.Client airavataClient;
  
-     private static String echoAppId = "Echo_7d2a5cde-5b2a-4cad-ae50-f71668f4876d";
+     private static String echoAppId = "Echo_a8fc8511-7b8e-431a-ad0f-de5eb1a9c576";
      private static String mpiAppId = "HelloMPI_720e159f-198f-4daa-96ca-9f5eafee92c9";
      private static String wrfAppId = "WRF_7ad5da38-c08b-417c-a9ea-da9298839762";
 -    private static String amberAppId = "Amber_eda074ea-223d-49d7-a942-6c8742249f36";
 +    private static String amberAppId = "Amber_42124128-628b-484c-829d-aff8b584eb00";
      private static String gromacsAppId = "GROMACS_05622038-9edd-4cb1-824e-0b7cb993364b";
      private static String espressoAppId = "ESPRESSO_10cc2820-5d0b-4c63-9546-8a8b595593c1";
      private static String lammpsAppId = "LAMMPS_10893eb5-3840-438c-8446-d26c7ecb001f";

http://git-wip-us.apache.org/repos/asf/airavata/blob/249b4401/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
----------------------------------------------------------------------
diff --cc modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
index 0000000,fe06ed7..966d44d
mode 000000,100644..100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
@@@ -1,0 -1,99 +1,103 @@@
+ /*
+  *
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  *
+  */
+ 
+ package org.apache.airavata.messaging.core.impl;
+ 
+ import org.apache.airavata.common.exception.AiravataException;
+ import org.apache.airavata.common.exception.ApplicationSettingsException;
+ import org.apache.airavata.common.utils.ServerSettings;
+ import org.apache.airavata.common.utils.ThriftUtils;
+ import org.apache.airavata.messaging.core.MessageContext;
+ import org.apache.airavata.messaging.core.MessagingConstants;
+ import org.apache.airavata.messaging.core.Publisher;
++import org.apache.airavata.messaging.core.stats.StatCounter;
+ import org.apache.airavata.model.messaging.event.*;
+ import org.apache.thrift.TException;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ public class RabbitMQStatusPublisher implements Publisher {
+ 
+     private static Logger log = LoggerFactory.getLogger(RabbitMQStatusPublisher.class);
+ 
+     private RabbitMQProducer rabbitMQProducer;
+ 
++    StatCounter statCounter = StatCounter.getInstance();
+ 
+     public RabbitMQStatusPublisher() throws Exception {
+         String brokerUrl;
+         String exchangeName;
+         try {
+             brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
+             exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME);
+         } catch (ApplicationSettingsException e) {
+             String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
+             log.error(message, e);
+             throw new AiravataException(message, e);
+         }
+         rabbitMQProducer = new RabbitMQProducer(brokerUrl, exchangeName);
+         rabbitMQProducer.open();
+     }
+ 
+     public void publish(MessageContext msgCtx) throws AiravataException {
+         try {
+             log.info("Publishing status to rabbitmq...");
+             byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent());
+             Message message = new Message();
+             message.setEvent(body);
+             message.setMessageId(msgCtx.getMessageId());
+             message.setMessageType(msgCtx.getType());
+             message.setUpdatedTime(msgCtx.getUpdatedTime().getTime());
++            String gatewayId = msgCtx.getGatewayId();
+             String routingKey = null;
+             if (msgCtx.getType().equals(MessageType.EXPERIMENT)){
+                 ExperimentStatusChangeEvent event = (ExperimentStatusChangeEvent) msgCtx.getEvent();
 -                routingKey = event.getExperimentId();
++                routingKey = gatewayId + "." + event.getExperimentId();
+             } else if (msgCtx.getType().equals(MessageType.TASK)) {
+                 TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent();
 -                routingKey = event.getTaskIdentity().getExperimentId() + "." +
++                routingKey =  gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." +
+                         event.getTaskIdentity().getWorkflowNodeId() + "." + event.getTaskIdentity().getTaskId();
+             }else if (msgCtx.getType().equals(MessageType.WORKFLOWNODE)){
+                 WorkflowNodeStatusChangeEvent event = (WorkflowNodeStatusChangeEvent) msgCtx.getEvent();
+                 WorkflowIdentifier workflowNodeIdentity = event.getWorkflowNodeIdentity();
 -                routingKey = workflowNodeIdentity.getExperimentId() + "." + workflowNodeIdentity.getWorkflowNodeId();
++                routingKey =  gatewayId + "." + workflowNodeIdentity.getExperimentId() + "." + workflowNodeIdentity.getWorkflowNodeId();
+             }else if (msgCtx.getType().equals(MessageType.JOB)){
+                 JobStatusChangeEvent event = (JobStatusChangeEvent)msgCtx.getEvent();
+                 JobIdentifier identity = event.getJobIdentity();
 -                routingKey = identity.getExperimentId() + "." +
++                routingKey =  gatewayId + "." + identity.getExperimentId() + "." +
+                         identity.getWorkflowNodeId() + "." +
+                         identity.getTaskId() + "." +
+                         identity.getJobId();
+             }
+             byte[] messageBody = ThriftUtils.serializeThriftObject(message);
+             rabbitMQProducer.send(messageBody, routingKey);
++            statCounter.add();
+         } catch (TException e) {
+             String msg = "Error while deserializing the object";
+             log.error(msg, e);
+             throw new AiravataException(msg, e);
+         } catch (Exception e) {
+             String msg = "Error while sending to rabbitmq";
+             log.error(msg, e);
+             throw new AiravataException(msg, e);
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/airavata/blob/249b4401/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/airavata/blob/249b4401/pom.xml
----------------------------------------------------------------------