You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/03/26 18:08:17 UTC
[18/50] [abbrv] airavata git commit: Merged queue-gfac-rabbitmq
Merged queue-gfac-rabbitmq
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/249b4401
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/249b4401
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/249b4401
Branch: refs/heads/master
Commit: 249b4401f0bfc821482dfbcb1d02eaf6832b9da1
Parents: 41ea946 840e627
Author: shamrath <sh...@gmail.com>
Authored: Tue Feb 24 10:37:29 2015 -0500
Committer: shamrath <sh...@gmail.com>
Committed: Tue Feb 24 10:37:29 2015 -0500
----------------------------------------------------------------------
.../server/handler/AiravataServerHandler.java | 2 +-
.../lib/airavata/messagingEvents_types.cpp | 202 +++++-
.../lib/airavata/messagingEvents_types.h | 106 ++-
.../Airavata/Model/Messaging/Event/Types.php | 228 +++++++
.../client/samples/CreateLaunchExperiment.java | 19 +-
.../model/messaging/event/MessageType.java | 8 +-
.../model/messaging/event/TaskSubmitEvent.java | 684 +++++++++++++++++++
.../messaging/event/TaskTerminateEvent.java | 492 +++++++++++++
airavata-api/generate-thrift-files.sh | 22 +-
.../messagingEvents.thrift | 16 +-
.../airavata/common/utils/AiravataZKUtils.java | 22 +
.../airavata/common/utils/ServerSettings.java | 24 +-
.../main/resources/airavata-server.properties | 11 +-
.../main/resources/airavata-server.properties | 9 +-
modules/gfac/airavata-gfac-service/pom.xml | 10 +
.../airavata/gfac/server/GfacServerHandler.java | 122 +++-
modules/gfac/gfac-core/pom.xml | 1 +
.../airavata/gfac/core/cpi/BetterGfacImpl.java | 2 +-
.../airavata/gfac/core/utils/GFacUtils.java | 116 +++-
.../messaging/core/MessagingConstants.java | 3 +-
.../messaging/core/PublisherFactory.java | 23 +-
.../airavata/messaging/core/TestClient.java | 5 +-
.../messaging/core/impl/RabbitMQConsumer.java | 258 -------
.../messaging/core/impl/RabbitMQProducer.java | 27 +-
.../messaging/core/impl/RabbitMQPublisher.java | 103 ---
.../core/impl/RabbitMQStatusConsumer.java | 274 ++++++++
.../core/impl/RabbitMQStatusPublisher.java | 103 +++
.../core/impl/RabbitMQTaskLaunchConsumer.java | 244 +++++++
.../core/impl/RabbitMQTaskLaunchPublisher.java | 85 +++
.../server/OrchestratorServerHandler.java | 3 +-
modules/orchestrator/orchestrator-core/pom.xml | 4 +-
.../core/context/OrchestratorContext.java | 11 +
.../core/impl/GFACPassiveJobSubmitter.java | 232 +++++++
.../core/impl/GFACRPCJobSubmitter.java | 212 ++++++
.../core/impl/GFACServiceJobSubmitter.java | 212 ------
.../core/utils/OrchestratorConstants.java | 1 -
.../workflow/engine/WorkflowEngineImpl.java | 4 +-
.../airavata/xbaya/messaging/Monitor.java | 5 +-
pom.xml | 7 +
39 files changed, 3255 insertions(+), 657 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/249b4401/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --cc airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index 7fbb18b,1e9d983..dd8686d
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@@ -57,10 -60,10 +57,10 @@@ public class CreateLaunchExperiment
private static final String DEFAULT_GATEWAY = "default.registry.gateway";
private static Airavata.Client airavataClient;
- private static String echoAppId = "Echo_7d2a5cde-5b2a-4cad-ae50-f71668f4876d";
+ private static String echoAppId = "Echo_a8fc8511-7b8e-431a-ad0f-de5eb1a9c576";
private static String mpiAppId = "HelloMPI_720e159f-198f-4daa-96ca-9f5eafee92c9";
private static String wrfAppId = "WRF_7ad5da38-c08b-417c-a9ea-da9298839762";
- private static String amberAppId = "Amber_eda074ea-223d-49d7-a942-6c8742249f36";
+ private static String amberAppId = "Amber_42124128-628b-484c-829d-aff8b584eb00";
private static String gromacsAppId = "GROMACS_05622038-9edd-4cb1-824e-0b7cb993364b";
private static String espressoAppId = "ESPRESSO_10cc2820-5d0b-4c63-9546-8a8b595593c1";
private static String lammpsAppId = "LAMMPS_10893eb5-3840-438c-8446-d26c7ecb001f";
http://git-wip-us.apache.org/repos/asf/airavata/blob/249b4401/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
----------------------------------------------------------------------
diff --cc modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
index 0000000,fe06ed7..966d44d
mode 000000,100644..100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
@@@ -1,0 -1,99 +1,103 @@@
+ /*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+ package org.apache.airavata.messaging.core.impl;
+
+ import org.apache.airavata.common.exception.AiravataException;
+ import org.apache.airavata.common.exception.ApplicationSettingsException;
+ import org.apache.airavata.common.utils.ServerSettings;
+ import org.apache.airavata.common.utils.ThriftUtils;
+ import org.apache.airavata.messaging.core.MessageContext;
+ import org.apache.airavata.messaging.core.MessagingConstants;
+ import org.apache.airavata.messaging.core.Publisher;
++import org.apache.airavata.messaging.core.stats.StatCounter;
+ import org.apache.airavata.model.messaging.event.*;
+ import org.apache.thrift.TException;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ public class RabbitMQStatusPublisher implements Publisher {
+
+ private static Logger log = LoggerFactory.getLogger(RabbitMQStatusPublisher.class);
+
+ private RabbitMQProducer rabbitMQProducer;
+
++ StatCounter statCounter = StatCounter.getInstance();
+
+ public RabbitMQStatusPublisher() throws Exception {
+ String brokerUrl;
+ String exchangeName;
+ try {
+ brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
+ exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME);
+ } catch (ApplicationSettingsException e) {
+ String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
+ log.error(message, e);
+ throw new AiravataException(message, e);
+ }
+ rabbitMQProducer = new RabbitMQProducer(brokerUrl, exchangeName);
+ rabbitMQProducer.open();
+ }
+
+ public void publish(MessageContext msgCtx) throws AiravataException {
+ try {
+ log.info("Publishing status to rabbitmq...");
+ byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent());
+ Message message = new Message();
+ message.setEvent(body);
+ message.setMessageId(msgCtx.getMessageId());
+ message.setMessageType(msgCtx.getType());
+ message.setUpdatedTime(msgCtx.getUpdatedTime().getTime());
++ String gatewayId = msgCtx.getGatewayId();
+ String routingKey = null;
+ if (msgCtx.getType().equals(MessageType.EXPERIMENT)){
+ ExperimentStatusChangeEvent event = (ExperimentStatusChangeEvent) msgCtx.getEvent();
- routingKey = event.getExperimentId();
++ routingKey = gatewayId + "." + event.getExperimentId();
+ } else if (msgCtx.getType().equals(MessageType.TASK)) {
+ TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent();
- routingKey = event.getTaskIdentity().getExperimentId() + "." +
++ routingKey = gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." +
+ event.getTaskIdentity().getWorkflowNodeId() + "." + event.getTaskIdentity().getTaskId();
+ }else if (msgCtx.getType().equals(MessageType.WORKFLOWNODE)){
+ WorkflowNodeStatusChangeEvent event = (WorkflowNodeStatusChangeEvent) msgCtx.getEvent();
+ WorkflowIdentifier workflowNodeIdentity = event.getWorkflowNodeIdentity();
- routingKey = workflowNodeIdentity.getExperimentId() + "." + workflowNodeIdentity.getWorkflowNodeId();
++ routingKey = gatewayId + "." + workflowNodeIdentity.getExperimentId() + "." + workflowNodeIdentity.getWorkflowNodeId();
+ }else if (msgCtx.getType().equals(MessageType.JOB)){
+ JobStatusChangeEvent event = (JobStatusChangeEvent)msgCtx.getEvent();
+ JobIdentifier identity = event.getJobIdentity();
- routingKey = identity.getExperimentId() + "." +
++ routingKey = gatewayId + "." + identity.getExperimentId() + "." +
+ identity.getWorkflowNodeId() + "." +
+ identity.getTaskId() + "." +
+ identity.getJobId();
+ }
+ byte[] messageBody = ThriftUtils.serializeThriftObject(message);
+ rabbitMQProducer.send(messageBody, routingKey);
++ statCounter.add();
+ } catch (TException e) {
+ String msg = "Error while deserializing the object";
+ log.error(msg, e);
+ throw new AiravataException(msg, e);
+ } catch (Exception e) {
+ String msg = "Error while sending to rabbitmq";
+ log.error(msg, e);
+ throw new AiravataException(msg, e);
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/airavata/blob/249b4401/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/249b4401/pom.xml
----------------------------------------------------------------------