You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2016/08/26 15:43:02 UTC
[07/50] [abbrv] airavata git commit: ApiServer publish experiment
submit events
ApiServer publish experiment submit events
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/4e3dc9a9
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/4e3dc9a9
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/4e3dc9a9
Branch: refs/heads/lahiru/AIRAVATA-2057
Commit: 4e3dc9a9e90e6043173a98d6f546b25ff7f2896a
Parents: 63696ff
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Wed Aug 10 18:38:23 2016 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Wed Aug 10 18:38:23 2016 -0400
----------------------------------------------------------------------
.../server/handler/AiravataServerHandler.java | 154 +++---
.../lib/airavata/messaging_events_types.cpp | 227 ++++++---
.../lib/airavata/messaging_events_types.h | 60 ++-
.../Airavata/Model/Messaging/Event/Types.php | 124 ++++-
.../airavata/model/messaging/event/ttypes.py | 121 ++++-
.../messaging/event/ExperimentSubmitEvent.java | 507 +++++++++++++++++++
.../model/messaging/event/MessageType.java | 25 +-
.../messaging/core/MessagingFactory.java | 20 +-
.../messaging/core/impl/ExperimentConsumer.java | 74 ++-
.../messaging/core/impl/ProcessConsumer.java | 6 +-
.../core/impl/GFACPassiveJobSubmitter.java | 2 +-
.../airavata-apis/messaging_events.thrift | 6 +
12 files changed, 1153 insertions(+), 173 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/4e3dc9a9/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
index ff0149e..cd21124 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
@@ -48,7 +48,14 @@ import org.apache.airavata.model.WorkflowModel;
import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
import org.apache.airavata.model.appcatalog.appdeployment.ApplicationModule;
import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
-import org.apache.airavata.model.appcatalog.computeresource.*;
+import org.apache.airavata.model.appcatalog.computeresource.CloudJobSubmission;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.LOCALSubmission;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
+import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
+import org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission;
import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile;
import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference;
@@ -57,16 +64,33 @@ import org.apache.airavata.model.application.io.InputDataObjectType;
import org.apache.airavata.model.application.io.OutputDataObjectType;
import org.apache.airavata.model.commons.airavata_commonsConstants;
import org.apache.airavata.model.data.movement.DMType;
-import org.apache.airavata.model.data.movement.*;
+import org.apache.airavata.model.data.movement.DataMovementInterface;
+import org.apache.airavata.model.data.movement.DataMovementProtocol;
+import org.apache.airavata.model.data.movement.GridFTPDataMovement;
+import org.apache.airavata.model.data.movement.LOCALDataMovement;
+import org.apache.airavata.model.data.movement.SCPDataMovement;
+import org.apache.airavata.model.data.movement.UnicoreDataMovement;
import org.apache.airavata.model.data.replica.DataProductModel;
import org.apache.airavata.model.data.replica.DataReplicaLocationModel;
-import org.apache.airavata.model.error.*;
-import org.apache.airavata.model.experiment.*;
+import org.apache.airavata.model.error.AiravataClientException;
+import org.apache.airavata.model.error.AiravataErrorType;
+import org.apache.airavata.model.error.AiravataSystemException;
+import org.apache.airavata.model.error.AuthorizationException;
+import org.apache.airavata.model.error.ExperimentNotFoundException;
+import org.apache.airavata.model.error.InvalidRequestException;
+import org.apache.airavata.model.error.ProjectNotFoundException;
+import org.apache.airavata.model.experiment.ExperimentModel;
+import org.apache.airavata.model.experiment.ExperimentSearchFields;
+import org.apache.airavata.model.experiment.ExperimentStatistics;
+import org.apache.airavata.model.experiment.ExperimentSummaryModel;
+import org.apache.airavata.model.experiment.ProjectSearchFields;
+import org.apache.airavata.model.experiment.UserConfigurationDataModel;
import org.apache.airavata.model.group.GroupModel;
import org.apache.airavata.model.group.ResourcePermissionType;
import org.apache.airavata.model.group.ResourceType;
import org.apache.airavata.model.job.JobModel;
import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.ExperimentSubmitEvent;
import org.apache.airavata.model.messaging.event.MessageType;
import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
import org.apache.airavata.model.security.AuthzToken;
@@ -76,14 +100,16 @@ import org.apache.airavata.model.status.JobStatus;
import org.apache.airavata.model.workspace.Gateway;
import org.apache.airavata.model.workspace.Notification;
import org.apache.airavata.model.workspace.Project;
-import org.apache.airavata.orchestrator.client.OrchestratorClientFactory;
-import org.apache.airavata.orchestrator.cpi.OrchestratorService;
-import org.apache.airavata.orchestrator.cpi.OrchestratorService.Client;
import org.apache.airavata.registry.api.RegistryService;
import org.apache.airavata.registry.api.client.RegistryServiceClientFactory;
import org.apache.airavata.registry.api.exception.RegistryServiceException;
import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
-import org.apache.airavata.registry.cpi.*;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.airavata.registry.cpi.ComputeResource;
+import org.apache.airavata.registry.cpi.ExperimentCatalog;
+import org.apache.airavata.registry.cpi.ExperimentCatalogException;
+import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
+import org.apache.airavata.registry.cpi.RegistryException;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,15 +118,18 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
public class AiravataServerHandler implements Airavata.Iface {
private static final Logger logger = LoggerFactory.getLogger(AiravataServerHandler.class);
- private Publisher publisher;
+ private Publisher statusPublisher;
+ private Publisher experimentPublisher;
private CredentialStoreService.Client csClient;
public AiravataServerHandler() {
try {
- publisher = MessagingFactory.getPublisher(Type.STATUS);
+ statusPublisher = MessagingFactory.getPublisher(Type.STATUS);
+ experimentPublisher = MessagingFactory.getPublisher(Type.EXPERIMENT_LAUNCH);
} catch (ApplicationSettingsException e) {
logger.error("Error occured while reading airavata-server properties..", e);
} catch (AiravataException e) {
@@ -365,7 +394,7 @@ public class AiravataServerHandler implements Airavata.Iface {
*
* @param authzToken
* @param gatewayId The identifier for the requested Gateway.
- * @param userName The User for which the credential should be registered. For community accounts, this user is the name of the
+ * @param portalUserName The User for which the credential should be registered. For community accounts, this user is the name of the
* community user name. For computational resources, this user name need not be the same user name on resoruces.
* @param password
* @return airavataCredStoreToken
@@ -867,8 +896,8 @@ public class AiravataServerHandler implements Airavata.Iface {
String messageId = AiravataUtils.getId("EXPERIMENT");
MessageContext messageContext = new MessageContext(event, MessageType.EXPERIMENT, messageId, gatewayId);
messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
- if(publisher!=null) {
- publisher.publish(messageContext);
+ if(statusPublisher !=null) {
+ statusPublisher.publish(messageContext);
}
logger.debug(experimentId, "Created new experiment with experiment name {}", experiment.getExperimentName());
return experimentId;
@@ -1122,9 +1151,9 @@ public class AiravataServerHandler implements Airavata.Iface {
*/
@Override
@SecurityCheck
- public boolean validateExperiment(AuthzToken authzToken, String airavataExperimentId) throws InvalidRequestException,
- ExperimentNotFoundException, AiravataClientException, AiravataSystemException, AuthorizationException, TException {
- try {
+ public boolean validateExperiment(AuthzToken authzToken, String airavataExperimentId) throws TException {
+ // TODO - call validation module and validate experiment
+/* try {
ExperimentModel experimentModel = getRegistryServiceClient().getExperiment(airavataExperimentId);
if (experimentModel == null) {
logger.error(airavataExperimentId, "Experiment validation failed , experiment {} doesn't exist.", airavataExperimentId);
@@ -1151,9 +1180,9 @@ public class AiravataServerHandler implements Airavata.Iface {
}finally {
orchestratorClient.getOutputProtocol().getTransport().close();
orchestratorClient.getInputProtocol().getTransport().close();
- }
-
+ }*/
+ return true;
}
/**
@@ -1180,8 +1209,7 @@ public class AiravataServerHandler implements Airavata.Iface {
*/
@Override
@SecurityCheck
- public ExperimentStatus getExperimentStatus(AuthzToken authzToken, String airavataExperimentId) throws InvalidRequestException,
- ExperimentNotFoundException, AiravataClientException, AiravataSystemException, AuthorizationException, TException {
+ public ExperimentStatus getExperimentStatus(AuthzToken authzToken, String airavataExperimentId) throws TException {
try {
return getRegistryServiceClient().getExperimentStatus(airavataExperimentId);
} catch (ApplicationSettingsException e) {
@@ -1274,50 +1302,37 @@ public class AiravataServerHandler implements Airavata.Iface {
@Override
@SecurityCheck
public void launchExperiment(AuthzToken authzToken, final String airavataExperimentId, String gatewayId)
- throws AuthorizationException, TException {
- try {
+ throws TException {
+ try {
ExperimentModel experiment = getRegistryServiceClient().getExperiment(airavataExperimentId);
if (experiment == null) {
logger.error(airavataExperimentId, "Error while launching experiment, experiment {} doesn't exist.", airavataExperimentId);
throw new ExperimentNotFoundException("Requested experiment id " + airavataExperimentId + " does not exist in the system..");
}
-// FIXME
-// String applicationID = experiment.getExecutionId();
-// if (!appCatalog.getApplicationInterface().isApplicationInterfaceExists(applicationID)){
-// logger.error(airavataExperimentId, "Error while launching experiment, application id {} for experiment {} doesn't exist.", applicationID, airavataExperimentId);
-// AiravataSystemException exception = new AiravataSystemException();
-// exception.setAiravataErrorType(AiravataErrorType.INTERNAL_ERROR);
-// exception.setMessage("Error while launching experiment, application id : " + applicationID + " for experiment : " + airavataExperimentId +
-// " doesn't exist..");
-// throw exception;
-// }
- OrchestratorService.Client orchestratorClient = getOrchestratorClient();
- if (orchestratorClient.validateExperiment(airavataExperimentId)) {
- orchestratorClient.launchExperiment(airavataExperimentId, gatewayId);
- logger.debug("Airavata launched experiment with experiment id : " + airavataExperimentId);
- }else {
- logger.error(airavataExperimentId, "Couldn't identify experiment type, experiment {} is neither single application nor workflow.", airavataExperimentId);
- throw new InvalidRequestException("Experiment '" + airavataExperimentId + "' launch failed. Unable to figureout execution type for application " + experiment.getExecutionId());
- }
+ submitExperiment(gatewayId, airavataExperimentId);
} catch (RegistryServiceException | ApplicationSettingsException e1) {
logger.error(airavataExperimentId, "Error while instantiate the registry instance", e1);
AiravataSystemException exception = new AiravataSystemException();
exception.setAiravataErrorType(AiravataErrorType.INTERNAL_ERROR);
exception.setMessage("Error while instantiate the registry instance. More info : " + e1.getMessage());
throw exception;
+ } catch (AiravataException ex) {
+ logger.error("Experiment publish event fails", ex);
+
}
}
- private OrchestratorService.Client getOrchestratorClient() throws TException {
- try {
- final String serverHost = ServerSettings.getOrchestratorServerHost();
- final int serverPort = ServerSettings.getOrchestratorServerPort();
- return OrchestratorClientFactory.createOrchestratorClient(serverHost, serverPort);
- } catch (AiravataException e) {
- throw new TException(e);
- }
- }
+
+// private OrchestratorService.Client getOrchestratorClient() throws TException {
+// try {
+// final String serverHost = ServerSettings.getOrchestratorServerHost();
+// final int serverPort = ServerSettings.getOrchestratorServerPort();
+// return OrchestratorClientFactory.createOrchestratorClient(serverHost, serverPort);
+// } catch (AiravataException e) {
+// throw new TException(e);
+// }
+// }
/**
* Clone an specified experiment with a new name. A copy of the experiment configuration is made and is persisted with new metadata.
@@ -1437,26 +1452,34 @@ public class AiravataServerHandler implements Airavata.Iface {
*/
@Override
@SecurityCheck
- public void terminateExperiment(AuthzToken authzToken, String airavataExperimentId, String gatewayId) throws InvalidRequestException,
- ExperimentNotFoundException, AiravataClientException, AiravataSystemException, AuthorizationException, TException {
+ public void terminateExperiment(AuthzToken authzToken, String airavataExperimentId, String gatewayId)
+ throws TException {
try {
RegistryService.Client regClient = getRegistryServiceClient();
ExperimentModel existingExperiment = regClient.getExperiment(airavataExperimentId);
if (existingExperiment == null){
- logger.error(airavataExperimentId, "Error while cloning experiment {}, experiment doesn't exist.", airavataExperimentId);
+ logger.error(airavataExperimentId, "Error while cancelling experiment {}, experiment doesn't exist.", airavataExperimentId);
throw new ExperimentNotFoundException("Requested experiment id " + airavataExperimentId + " does not exist in the system..");
}
+ ExperimentStatus experimentStatus = null;
+ switch (experimentStatus.getState()) {
+ case COMPLETED: case CANCELED: case FAILED: case CANCELING:
+ logger.warn("Can't terminate already {} experiment", experimentStatus.getState().name());
+ case CREATED:
+ logger.warn("Experiment termination is only allowed for launched experiments.");
+ default:
+ submitCancelExperiment(airavataExperimentId, gatewayId);
- Client client = getOrchestratorClient();
- client.terminateExperiment(airavataExperimentId, gatewayId);
+ }
logger.debug("Airavata cancelled experiment with experiment id : " + airavataExperimentId);
- } catch (RegistryServiceException | ApplicationSettingsException e) {
+ } catch (RegistryServiceException | AiravataException e) {
logger.error(airavataExperimentId, "Error while cancelling the experiment...", e);
AiravataSystemException exception = new AiravataSystemException();
exception.setAiravataErrorType(AiravataErrorType.INTERNAL_ERROR);
exception.setMessage("Error while cancelling the experiment. More info : " + e.getMessage());
throw exception;
}
+
}
/**
@@ -2902,8 +2925,9 @@ public class AiravataServerHandler implements Airavata.Iface {
*/
@Override
@SecurityCheck
- public boolean updateGatewayResourceProfile(AuthzToken authzToken, String gatewayID, GatewayResourceProfile gatewayResourceProfile)
- throws InvalidRequestException, AiravataClientException, AiravataSystemException, AuthorizationException, TException {
+ public boolean updateGatewayResourceProfile(AuthzToken authzToken,
+ String gatewayID,
+ GatewayResourceProfile gatewayResourceProfile) throws TException {
try {
return getRegistryServiceClient().updateGatewayResourceProfile(gatewayID, gatewayResourceProfile);
} catch (ApplicationSettingsException | RegistryServiceException e) {
@@ -2924,8 +2948,7 @@ public class AiravataServerHandler implements Airavata.Iface {
*/
@Override
@SecurityCheck
- public boolean deleteGatewayResourceProfile(AuthzToken authzToken, String gatewayID) throws InvalidRequestException,
- AiravataClientException, AiravataSystemException, AuthorizationException, TException {
+ public boolean deleteGatewayResourceProfile(AuthzToken authzToken, String gatewayID) throws TException {
try {
return getRegistryServiceClient().deleteGatewayResourceProfile(gatewayID);
} catch (ApplicationSettingsException | RegistryServiceException e) {
@@ -3652,6 +3675,19 @@ public class AiravataServerHandler implements Airavata.Iface {
return allAccessibleResources;
}
+
+ private void submitExperiment(String gatewayId,String experimentId) throws AiravataException {
+ ExperimentSubmitEvent event = new ExperimentSubmitEvent(experimentId, gatewayId);
+ MessageContext messageContext = new MessageContext(event, MessageType.EXPERIMENT, "LAUNCH.EXP-" + UUID.randomUUID().toString(), gatewayId);
+ experimentPublisher.publish(messageContext);
+ }
+
+ private void submitCancelExperiment(String gatewayId, String experimentId) throws AiravataException {
+ ExperimentSubmitEvent event = new ExperimentSubmitEvent(experimentId, gatewayId);
+ MessageContext messageContext = new MessageContext(event, MessageType.EXPERIMENT_CANCEL, "CANCEL.EXP-" + UUID.randomUUID().toString(), gatewayId);
+ experimentPublisher.publish(messageContext);
+ }
+
private CredentialStoreService.Client getCredentialStoreServiceClient() throws TException, ApplicationSettingsException {
final int serverPort = Integer.parseInt(ServerSettings.getCredentialStoreServerPort());
final String serverHost = ServerSettings.getCredentialStoreServerHost();
http://git-wip-us.apache.org/repos/asf/airavata/blob/4e3dc9a9/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messaging_events_types.cpp
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messaging_events_types.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messaging_events_types.cpp
index 8317b71..f7d5d87 100644
--- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messaging_events_types.cpp
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messaging_events_types.cpp
@@ -46,6 +46,7 @@ const std::map<int, const char*> _MessageLevel_VALUES_TO_NAMES(::apache::thrift:
int _kMessageTypeValues[] = {
MessageType::EXPERIMENT,
+ MessageType::EXPERIMENT_CANCEL,
MessageType::TASK,
MessageType::PROCESS,
MessageType::JOB,
@@ -55,6 +56,7 @@ int _kMessageTypeValues[] = {
};
const char* _kMessageTypeNames[] = {
"EXPERIMENT",
+ "EXPERIMENT_CANCEL",
"TASK",
"PROCESS",
"JOB",
@@ -62,7 +64,7 @@ const char* _kMessageTypeNames[] = {
"TERMINATEPROCESS",
"PROCESSOUTPUT"
};
-const std::map<int, const char*> _MessageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(7, _kMessageTypeValues, _kMessageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
+const std::map<int, const char*> _MessageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(8, _kMessageTypeValues, _kMessageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
ExperimentStatusChangeEvent::~ExperimentStatusChangeEvent() throw() {
@@ -1237,6 +1239,115 @@ void JobIdentifier::printTo(std::ostream& out) const {
}
+ExperimentSubmitEvent::~ExperimentSubmitEvent() throw() {
+}
+
+
+void ExperimentSubmitEvent::__set_experimentId(const std::string& val) {
+ this->experimentId = val;
+}
+
+void ExperimentSubmitEvent::__set_gatewayId(const std::string& val) {
+ this->gatewayId = val;
+}
+
+uint32_t ExperimentSubmitEvent::read(::apache::thrift::protocol::TProtocol* iprot) {
+
+ apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
+ uint32_t xfer = 0;
+ std::string fname;
+ ::apache::thrift::protocol::TType ftype;
+ int16_t fid;
+
+ xfer += iprot->readStructBegin(fname);
+
+ using ::apache::thrift::protocol::TProtocolException;
+
+ bool isset_experimentId = false;
+ bool isset_gatewayId = false;
+
+ while (true)
+ {
+ xfer += iprot->readFieldBegin(fname, ftype, fid);
+ if (ftype == ::apache::thrift::protocol::T_STOP) {
+ break;
+ }
+ switch (fid)
+ {
+ case 1:
+ if (ftype == ::apache::thrift::protocol::T_STRING) {
+ xfer += iprot->readString(this->experimentId);
+ isset_experimentId = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ case 2:
+ if (ftype == ::apache::thrift::protocol::T_STRING) {
+ xfer += iprot->readString(this->gatewayId);
+ isset_gatewayId = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ default:
+ xfer += iprot->skip(ftype);
+ break;
+ }
+ xfer += iprot->readFieldEnd();
+ }
+
+ xfer += iprot->readStructEnd();
+
+ if (!isset_experimentId)
+ throw TProtocolException(TProtocolException::INVALID_DATA);
+ if (!isset_gatewayId)
+ throw TProtocolException(TProtocolException::INVALID_DATA);
+ return xfer;
+}
+
+uint32_t ExperimentSubmitEvent::write(::apache::thrift::protocol::TProtocol* oprot) const {
+ uint32_t xfer = 0;
+ apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
+ xfer += oprot->writeStructBegin("ExperimentSubmitEvent");
+
+ xfer += oprot->writeFieldBegin("experimentId", ::apache::thrift::protocol::T_STRING, 1);
+ xfer += oprot->writeString(this->experimentId);
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldBegin("gatewayId", ::apache::thrift::protocol::T_STRING, 2);
+ xfer += oprot->writeString(this->gatewayId);
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldStop();
+ xfer += oprot->writeStructEnd();
+ return xfer;
+}
+
+void swap(ExperimentSubmitEvent &a, ExperimentSubmitEvent &b) {
+ using ::std::swap;
+ swap(a.experimentId, b.experimentId);
+ swap(a.gatewayId, b.gatewayId);
+}
+
+ExperimentSubmitEvent::ExperimentSubmitEvent(const ExperimentSubmitEvent& other29) {
+ experimentId = other29.experimentId;
+ gatewayId = other29.gatewayId;
+}
+ExperimentSubmitEvent& ExperimentSubmitEvent::operator=(const ExperimentSubmitEvent& other30) {
+ experimentId = other30.experimentId;
+ gatewayId = other30.gatewayId;
+ return *this;
+}
+void ExperimentSubmitEvent::printTo(std::ostream& out) const {
+ using ::apache::thrift::to_string;
+ out << "ExperimentSubmitEvent(";
+ out << "experimentId=" << to_string(experimentId);
+ out << ", " << "gatewayId=" << to_string(gatewayId);
+ out << ")";
+}
+
+
ProcessSubmitEvent::~ProcessSubmitEvent() throw() {
}
@@ -1368,17 +1479,17 @@ void swap(ProcessSubmitEvent &a, ProcessSubmitEvent &b) {
swap(a.tokenId, b.tokenId);
}
-ProcessSubmitEvent::ProcessSubmitEvent(const ProcessSubmitEvent& other29) {
- processId = other29.processId;
- gatewayId = other29.gatewayId;
- experimentId = other29.experimentId;
- tokenId = other29.tokenId;
+ProcessSubmitEvent::ProcessSubmitEvent(const ProcessSubmitEvent& other31) {
+ processId = other31.processId;
+ gatewayId = other31.gatewayId;
+ experimentId = other31.experimentId;
+ tokenId = other31.tokenId;
}
-ProcessSubmitEvent& ProcessSubmitEvent::operator=(const ProcessSubmitEvent& other30) {
- processId = other30.processId;
- gatewayId = other30.gatewayId;
- experimentId = other30.experimentId;
- tokenId = other30.tokenId;
+ProcessSubmitEvent& ProcessSubmitEvent::operator=(const ProcessSubmitEvent& other32) {
+ processId = other32.processId;
+ gatewayId = other32.gatewayId;
+ experimentId = other32.experimentId;
+ tokenId = other32.tokenId;
return *this;
}
void ProcessSubmitEvent::printTo(std::ostream& out) const {
@@ -1503,15 +1614,15 @@ void swap(ProcessTerminateEvent &a, ProcessTerminateEvent &b) {
swap(a.tokenId, b.tokenId);
}
-ProcessTerminateEvent::ProcessTerminateEvent(const ProcessTerminateEvent& other31) {
- processId = other31.processId;
- gatewayId = other31.gatewayId;
- tokenId = other31.tokenId;
+ProcessTerminateEvent::ProcessTerminateEvent(const ProcessTerminateEvent& other33) {
+ processId = other33.processId;
+ gatewayId = other33.gatewayId;
+ tokenId = other33.tokenId;
}
-ProcessTerminateEvent& ProcessTerminateEvent::operator=(const ProcessTerminateEvent& other32) {
- processId = other32.processId;
- gatewayId = other32.gatewayId;
- tokenId = other32.tokenId;
+ProcessTerminateEvent& ProcessTerminateEvent::operator=(const ProcessTerminateEvent& other34) {
+ processId = other34.processId;
+ gatewayId = other34.gatewayId;
+ tokenId = other34.tokenId;
return *this;
}
void ProcessTerminateEvent::printTo(std::ostream& out) const {
@@ -1561,9 +1672,9 @@ uint32_t JobStatusChangeEvent::read(::apache::thrift::protocol::TProtocol* iprot
{
case 1:
if (ftype == ::apache::thrift::protocol::T_I32) {
- int32_t ecast33;
- xfer += iprot->readI32(ecast33);
- this->state = ( ::apache::airavata::model::status::JobState::type)ecast33;
+ int32_t ecast35;
+ xfer += iprot->readI32(ecast35);
+ this->state = ( ::apache::airavata::model::status::JobState::type)ecast35;
isset_state = true;
} else {
xfer += iprot->skip(ftype);
@@ -1617,13 +1728,13 @@ void swap(JobStatusChangeEvent &a, JobStatusChangeEvent &b) {
swap(a.jobIdentity, b.jobIdentity);
}
-JobStatusChangeEvent::JobStatusChangeEvent(const JobStatusChangeEvent& other34) {
- state = other34.state;
- jobIdentity = other34.jobIdentity;
+JobStatusChangeEvent::JobStatusChangeEvent(const JobStatusChangeEvent& other36) {
+ state = other36.state;
+ jobIdentity = other36.jobIdentity;
}
-JobStatusChangeEvent& JobStatusChangeEvent::operator=(const JobStatusChangeEvent& other35) {
- state = other35.state;
- jobIdentity = other35.jobIdentity;
+JobStatusChangeEvent& JobStatusChangeEvent::operator=(const JobStatusChangeEvent& other37) {
+ state = other37.state;
+ jobIdentity = other37.jobIdentity;
return *this;
}
void JobStatusChangeEvent::printTo(std::ostream& out) const {
@@ -1672,9 +1783,9 @@ uint32_t JobStatusChangeRequestEvent::read(::apache::thrift::protocol::TProtocol
{
case 1:
if (ftype == ::apache::thrift::protocol::T_I32) {
- int32_t ecast36;
- xfer += iprot->readI32(ecast36);
- this->state = ( ::apache::airavata::model::status::JobState::type)ecast36;
+ int32_t ecast38;
+ xfer += iprot->readI32(ecast38);
+ this->state = ( ::apache::airavata::model::status::JobState::type)ecast38;
isset_state = true;
} else {
xfer += iprot->skip(ftype);
@@ -1728,13 +1839,13 @@ void swap(JobStatusChangeRequestEvent &a, JobStatusChangeRequestEvent &b) {
swap(a.jobIdentity, b.jobIdentity);
}
-JobStatusChangeRequestEvent::JobStatusChangeRequestEvent(const JobStatusChangeRequestEvent& other37) {
- state = other37.state;
- jobIdentity = other37.jobIdentity;
+JobStatusChangeRequestEvent::JobStatusChangeRequestEvent(const JobStatusChangeRequestEvent& other39) {
+ state = other39.state;
+ jobIdentity = other39.jobIdentity;
}
-JobStatusChangeRequestEvent& JobStatusChangeRequestEvent::operator=(const JobStatusChangeRequestEvent& other38) {
- state = other38.state;
- jobIdentity = other38.jobIdentity;
+JobStatusChangeRequestEvent& JobStatusChangeRequestEvent::operator=(const JobStatusChangeRequestEvent& other40) {
+ state = other40.state;
+ jobIdentity = other40.jobIdentity;
return *this;
}
void JobStatusChangeRequestEvent::printTo(std::ostream& out) const {
@@ -1814,9 +1925,9 @@ uint32_t Message::read(::apache::thrift::protocol::TProtocol* iprot) {
break;
case 3:
if (ftype == ::apache::thrift::protocol::T_I32) {
- int32_t ecast39;
- xfer += iprot->readI32(ecast39);
- this->messageType = (MessageType::type)ecast39;
+ int32_t ecast41;
+ xfer += iprot->readI32(ecast41);
+ this->messageType = (MessageType::type)ecast41;
isset_messageType = true;
} else {
xfer += iprot->skip(ftype);
@@ -1832,9 +1943,9 @@ uint32_t Message::read(::apache::thrift::protocol::TProtocol* iprot) {
break;
case 5:
if (ftype == ::apache::thrift::protocol::T_I32) {
- int32_t ecast40;
- xfer += iprot->readI32(ecast40);
- this->messageLevel = (MessageLevel::type)ecast40;
+ int32_t ecast42;
+ xfer += iprot->readI32(ecast42);
+ this->messageLevel = (MessageLevel::type)ecast42;
this->__isset.messageLevel = true;
} else {
xfer += iprot->skip(ftype);
@@ -1900,21 +2011,21 @@ void swap(Message &a, Message &b) {
swap(a.__isset, b.__isset);
}
-Message::Message(const Message& other41) {
- event = other41.event;
- messageId = other41.messageId;
- messageType = other41.messageType;
- updatedTime = other41.updatedTime;
- messageLevel = other41.messageLevel;
- __isset = other41.__isset;
-}
-Message& Message::operator=(const Message& other42) {
- event = other42.event;
- messageId = other42.messageId;
- messageType = other42.messageType;
- updatedTime = other42.updatedTime;
- messageLevel = other42.messageLevel;
- __isset = other42.__isset;
+Message::Message(const Message& other43) {
+ event = other43.event;
+ messageId = other43.messageId;
+ messageType = other43.messageType;
+ updatedTime = other43.updatedTime;
+ messageLevel = other43.messageLevel;
+ __isset = other43.__isset;
+}
+Message& Message::operator=(const Message& other44) {
+ event = other44.event;
+ messageId = other44.messageId;
+ messageType = other44.messageType;
+ updatedTime = other44.updatedTime;
+ messageLevel = other44.messageLevel;
+ __isset = other44.__isset;
return *this;
}
void Message::printTo(std::ostream& out) const {
http://git-wip-us.apache.org/repos/asf/airavata/blob/4e3dc9a9/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messaging_events_types.h
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messaging_events_types.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messaging_events_types.h
index d5a2411..15caed1 100644
--- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messaging_events_types.h
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messaging_events_types.h
@@ -53,12 +53,13 @@ extern const std::map<int, const char*> _MessageLevel_VALUES_TO_NAMES;
struct MessageType {
enum type {
EXPERIMENT = 0,
- TASK = 1,
- PROCESS = 2,
- JOB = 3,
- LAUNCHPROCESS = 4,
- TERMINATEPROCESS = 5,
- PROCESSOUTPUT = 6
+ EXPERIMENT_CANCEL = 1,
+ TASK = 2,
+ PROCESS = 3,
+ JOB = 4,
+ LAUNCHPROCESS = 5,
+ TERMINATEPROCESS = 6,
+ PROCESSOUTPUT = 7
};
};
@@ -82,6 +83,8 @@ class TaskOutputChangeEvent;
class JobIdentifier;
+class ExperimentSubmitEvent;
+
class ProcessSubmitEvent;
class ProcessTerminateEvent;
@@ -533,6 +536,51 @@ inline std::ostream& operator<<(std::ostream& out, const JobIdentifier& obj)
}
+class ExperimentSubmitEvent {
+ public:
+
+ ExperimentSubmitEvent(const ExperimentSubmitEvent&);
+ ExperimentSubmitEvent& operator=(const ExperimentSubmitEvent&);
+ ExperimentSubmitEvent() : experimentId(), gatewayId() {
+ }
+
+ virtual ~ExperimentSubmitEvent() throw();
+ std::string experimentId;
+ std::string gatewayId;
+
+ void __set_experimentId(const std::string& val);
+
+ void __set_gatewayId(const std::string& val);
+
+ bool operator == (const ExperimentSubmitEvent & rhs) const
+ {
+ if (!(experimentId == rhs.experimentId))
+ return false;
+ if (!(gatewayId == rhs.gatewayId))
+ return false;
+ return true;
+ }
+ bool operator != (const ExperimentSubmitEvent &rhs) const {
+ return !(*this == rhs);
+ }
+
+ bool operator < (const ExperimentSubmitEvent & ) const;
+
+ uint32_t read(::apache::thrift::protocol::TProtocol* iprot);
+ uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;
+
+ virtual void printTo(std::ostream& out) const;
+};
+
+void swap(ExperimentSubmitEvent &a, ExperimentSubmitEvent &b);
+
+inline std::ostream& operator<<(std::ostream& out, const ExperimentSubmitEvent& obj)
+{
+ obj.printTo(out);
+ return out;
+}
+
+
class ProcessSubmitEvent {
public:
http://git-wip-us.apache.org/repos/asf/airavata/blob/4e3dc9a9/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php
index 9c6a1e8..96a015e 100644
--- a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php
+++ b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php
@@ -32,20 +32,22 @@ final class MessageLevel {
final class MessageType {
const EXPERIMENT = 0;
- const TASK = 1;
- const PROCESS = 2;
- const JOB = 3;
- const LAUNCHPROCESS = 4;
- const TERMINATEPROCESS = 5;
- const PROCESSOUTPUT = 6;
+ const EXPERIMENT_CANCEL = 1;
+ const TASK = 2;
+ const PROCESS = 3;
+ const JOB = 4;
+ const LAUNCHPROCESS = 5;
+ const TERMINATEPROCESS = 6;
+ const PROCESSOUTPUT = 7;
static public $__names = array(
0 => 'EXPERIMENT',
- 1 => 'TASK',
- 2 => 'PROCESS',
- 3 => 'JOB',
- 4 => 'LAUNCHPROCESS',
- 5 => 'TERMINATEPROCESS',
- 6 => 'PROCESSOUTPUT',
+ 1 => 'EXPERIMENT_CANCEL',
+ 2 => 'TASK',
+ 3 => 'PROCESS',
+ 4 => 'JOB',
+ 5 => 'LAUNCHPROCESS',
+ 6 => 'TERMINATEPROCESS',
+ 7 => 'PROCESSOUTPUT',
);
}
@@ -1145,6 +1147,104 @@ class JobIdentifier {
}
+class ExperimentSubmitEvent {
+ static $_TSPEC;
+
+ /**
+ * @var string
+ */
+ public $experimentId = null;
+ /**
+ * @var string
+ */
+ public $gatewayId = null;
+
+ public function __construct($vals=null) {
+ if (!isset(self::$_TSPEC)) {
+ self::$_TSPEC = array(
+ 1 => array(
+ 'var' => 'experimentId',
+ 'type' => TType::STRING,
+ ),
+ 2 => array(
+ 'var' => 'gatewayId',
+ 'type' => TType::STRING,
+ ),
+ );
+ }
+ if (is_array($vals)) {
+ if (isset($vals['experimentId'])) {
+ $this->experimentId = $vals['experimentId'];
+ }
+ if (isset($vals['gatewayId'])) {
+ $this->gatewayId = $vals['gatewayId'];
+ }
+ }
+ }
+
+ public function getName() {
+ return 'ExperimentSubmitEvent';
+ }
+
+ public function read($input)
+ {
+ $xfer = 0;
+ $fname = null;
+ $ftype = 0;
+ $fid = 0;
+ $xfer += $input->readStructBegin($fname);
+ while (true)
+ {
+ $xfer += $input->readFieldBegin($fname, $ftype, $fid);
+ if ($ftype == TType::STOP) {
+ break;
+ }
+ switch ($fid)
+ {
+ case 1:
+ if ($ftype == TType::STRING) {
+ $xfer += $input->readString($this->experimentId);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
+ case 2:
+ if ($ftype == TType::STRING) {
+ $xfer += $input->readString($this->gatewayId);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
+ default:
+ $xfer += $input->skip($ftype);
+ break;
+ }
+ $xfer += $input->readFieldEnd();
+ }
+ $xfer += $input->readStructEnd();
+ return $xfer;
+ }
+
+ public function write($output) {
+ $xfer = 0;
+ $xfer += $output->writeStructBegin('ExperimentSubmitEvent');
+ if ($this->experimentId !== null) {
+ $xfer += $output->writeFieldBegin('experimentId', TType::STRING, 1);
+ $xfer += $output->writeString($this->experimentId);
+ $xfer += $output->writeFieldEnd();
+ }
+ if ($this->gatewayId !== null) {
+ $xfer += $output->writeFieldBegin('gatewayId', TType::STRING, 2);
+ $xfer += $output->writeString($this->gatewayId);
+ $xfer += $output->writeFieldEnd();
+ }
+ $xfer += $output->writeFieldStop();
+ $xfer += $output->writeStructEnd();
+ return $xfer;
+ }
+
+}
+
class ProcessSubmitEvent {
static $_TSPEC;
http://git-wip-us.apache.org/repos/asf/airavata/blob/4e3dc9a9/airavata-api/airavata-client-sdks/airavata-python-sdk/src/main/resources/lib/apache/airavata/model/messaging/event/ttypes.py
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/src/main/resources/lib/apache/airavata/model/messaging/event/ttypes.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/src/main/resources/lib/apache/airavata/model/messaging/event/ttypes.py
index 312e07a..818841d 100644
--- a/airavata-api/airavata-client-sdks/airavata-python-sdk/src/main/resources/lib/apache/airavata/model/messaging/event/ttypes.py
+++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/src/main/resources/lib/apache/airavata/model/messaging/event/ttypes.py
@@ -42,31 +42,34 @@ class MessageLevel:
class MessageType:
EXPERIMENT = 0
- TASK = 1
- PROCESS = 2
- JOB = 3
- LAUNCHPROCESS = 4
- TERMINATEPROCESS = 5
- PROCESSOUTPUT = 6
+ EXPERIMENT_CANCEL = 1
+ TASK = 2
+ PROCESS = 3
+ JOB = 4
+ LAUNCHPROCESS = 5
+ TERMINATEPROCESS = 6
+ PROCESSOUTPUT = 7
_VALUES_TO_NAMES = {
0: "EXPERIMENT",
- 1: "TASK",
- 2: "PROCESS",
- 3: "JOB",
- 4: "LAUNCHPROCESS",
- 5: "TERMINATEPROCESS",
- 6: "PROCESSOUTPUT",
+ 1: "EXPERIMENT_CANCEL",
+ 2: "TASK",
+ 3: "PROCESS",
+ 4: "JOB",
+ 5: "LAUNCHPROCESS",
+ 6: "TERMINATEPROCESS",
+ 7: "PROCESSOUTPUT",
}
_NAMES_TO_VALUES = {
"EXPERIMENT": 0,
- "TASK": 1,
- "PROCESS": 2,
- "JOB": 3,
- "LAUNCHPROCESS": 4,
- "TERMINATEPROCESS": 5,
- "PROCESSOUTPUT": 6,
+ "EXPERIMENT_CANCEL": 1,
+ "TASK": 2,
+ "PROCESS": 3,
+ "JOB": 4,
+ "LAUNCHPROCESS": 5,
+ "TERMINATEPROCESS": 6,
+ "PROCESSOUTPUT": 7,
}
@@ -927,6 +930,88 @@ class JobIdentifier:
def __ne__(self, other):
return not (self == other)
+class ExperimentSubmitEvent:
+ """
+ Attributes:
+ - experimentId
+ - gatewayId
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.STRING, 'experimentId', None, None, ), # 1
+ (2, TType.STRING, 'gatewayId', None, None, ), # 2
+ )
+
+ def __init__(self, experimentId=None, gatewayId=None,):
+ self.experimentId = experimentId
+ self.gatewayId = gatewayId
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.STRING:
+ self.experimentId = iprot.readString()
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.STRING:
+ self.gatewayId = iprot.readString()
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('ExperimentSubmitEvent')
+ if self.experimentId is not None:
+ oprot.writeFieldBegin('experimentId', TType.STRING, 1)
+ oprot.writeString(self.experimentId)
+ oprot.writeFieldEnd()
+ if self.gatewayId is not None:
+ oprot.writeFieldBegin('gatewayId', TType.STRING, 2)
+ oprot.writeString(self.gatewayId)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ if self.experimentId is None:
+ raise TProtocol.TProtocolException(message='Required field experimentId is unset!')
+ if self.gatewayId is None:
+ raise TProtocol.TProtocolException(message='Required field gatewayId is unset!')
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.experimentId)
+ value = (value * 31) ^ hash(self.gatewayId)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
class ProcessSubmitEvent:
"""
Attributes:
http://git-wip-us.apache.org/repos/asf/airavata/blob/4e3dc9a9/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/ExperimentSubmitEvent.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/ExperimentSubmitEvent.java b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/ExperimentSubmitEvent.java
new file mode 100644
index 0000000..7ec7315
--- /dev/null
+++ b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/ExperimentSubmitEvent.java
@@ -0,0 +1,507 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Autogenerated by Thrift Compiler (0.9.3)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package org.apache.airavata.model.messaging.event;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import javax.annotation.Generated;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)")
+public class ExperimentSubmitEvent implements org.apache.thrift.TBase<ExperimentSubmitEvent, ExperimentSubmitEvent._Fields>, java.io.Serializable, Cloneable, Comparable<ExperimentSubmitEvent> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ExperimentSubmitEvent");
+
+ private static final org.apache.thrift.protocol.TField EXPERIMENT_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("experimentId", org.apache.thrift.protocol.TType.STRING, (short)1);
+ private static final org.apache.thrift.protocol.TField GATEWAY_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("gatewayId", org.apache.thrift.protocol.TType.STRING, (short)2);
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new ExperimentSubmitEventStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new ExperimentSubmitEventTupleSchemeFactory());
+ }
+
+ private String experimentId; // required
+ private String gatewayId; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ EXPERIMENT_ID((short)1, "experimentId"),
+ GATEWAY_ID((short)2, "gatewayId");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // EXPERIMENT_ID
+ return EXPERIMENT_ID;
+ case 2: // GATEWAY_ID
+ return GATEWAY_ID;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.EXPERIMENT_ID, new org.apache.thrift.meta_data.FieldMetaData("experimentId", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.GATEWAY_ID, new org.apache.thrift.meta_data.FieldMetaData("gatewayId", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ExperimentSubmitEvent.class, metaDataMap);
+ }
+
+ public ExperimentSubmitEvent() {
+ }
+
+ public ExperimentSubmitEvent(
+ String experimentId,
+ String gatewayId)
+ {
+ this();
+ this.experimentId = experimentId;
+ this.gatewayId = gatewayId;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public ExperimentSubmitEvent(ExperimentSubmitEvent other) {
+ if (other.isSetExperimentId()) {
+ this.experimentId = other.experimentId;
+ }
+ if (other.isSetGatewayId()) {
+ this.gatewayId = other.gatewayId;
+ }
+ }
+
+ public ExperimentSubmitEvent deepCopy() {
+ return new ExperimentSubmitEvent(this);
+ }
+
+ @Override
+ public void clear() {
+ this.experimentId = null;
+ this.gatewayId = null;
+ }
+
+ public String getExperimentId() {
+ return this.experimentId;
+ }
+
+ public void setExperimentId(String experimentId) {
+ this.experimentId = experimentId;
+ }
+
+ public void unsetExperimentId() {
+ this.experimentId = null;
+ }
+
+ /** Returns true if field experimentId is set (has been assigned a value) and false otherwise */
+ public boolean isSetExperimentId() {
+ return this.experimentId != null;
+ }
+
+ public void setExperimentIdIsSet(boolean value) {
+ if (!value) {
+ this.experimentId = null;
+ }
+ }
+
+ public String getGatewayId() {
+ return this.gatewayId;
+ }
+
+ public void setGatewayId(String gatewayId) {
+ this.gatewayId = gatewayId;
+ }
+
+ public void unsetGatewayId() {
+ this.gatewayId = null;
+ }
+
+ /** Returns true if field gatewayId is set (has been assigned a value) and false otherwise */
+ public boolean isSetGatewayId() {
+ return this.gatewayId != null;
+ }
+
+ public void setGatewayIdIsSet(boolean value) {
+ if (!value) {
+ this.gatewayId = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case EXPERIMENT_ID:
+ if (value == null) {
+ unsetExperimentId();
+ } else {
+ setExperimentId((String)value);
+ }
+ break;
+
+ case GATEWAY_ID:
+ if (value == null) {
+ unsetGatewayId();
+ } else {
+ setGatewayId((String)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case EXPERIMENT_ID:
+ return getExperimentId();
+
+ case GATEWAY_ID:
+ return getGatewayId();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case EXPERIMENT_ID:
+ return isSetExperimentId();
+ case GATEWAY_ID:
+ return isSetGatewayId();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof ExperimentSubmitEvent)
+ return this.equals((ExperimentSubmitEvent)that);
+ return false;
+ }
+
+ public boolean equals(ExperimentSubmitEvent that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_experimentId = true && this.isSetExperimentId();
+ boolean that_present_experimentId = true && that.isSetExperimentId();
+ if (this_present_experimentId || that_present_experimentId) {
+ if (!(this_present_experimentId && that_present_experimentId))
+ return false;
+ if (!this.experimentId.equals(that.experimentId))
+ return false;
+ }
+
+ boolean this_present_gatewayId = true && this.isSetGatewayId();
+ boolean that_present_gatewayId = true && that.isSetGatewayId();
+ if (this_present_gatewayId || that_present_gatewayId) {
+ if (!(this_present_gatewayId && that_present_gatewayId))
+ return false;
+ if (!this.gatewayId.equals(that.gatewayId))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ List<Object> list = new ArrayList<Object>();
+
+ boolean present_experimentId = true && (isSetExperimentId());
+ list.add(present_experimentId);
+ if (present_experimentId)
+ list.add(experimentId);
+
+ boolean present_gatewayId = true && (isSetGatewayId());
+ list.add(present_gatewayId);
+ if (present_gatewayId)
+ list.add(gatewayId);
+
+ return list.hashCode();
+ }
+
+ @Override
+ public int compareTo(ExperimentSubmitEvent other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = Boolean.valueOf(isSetExperimentId()).compareTo(other.isSetExperimentId());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetExperimentId()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.experimentId, other.experimentId);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetGatewayId()).compareTo(other.isSetGatewayId());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetGatewayId()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.gatewayId, other.gatewayId);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("ExperimentSubmitEvent(");
+ boolean first = true;
+
+ sb.append("experimentId:");
+ if (this.experimentId == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.experimentId);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("gatewayId:");
+ if (this.gatewayId == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.gatewayId);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ if (!isSetExperimentId()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'experimentId' is unset! Struct:" + toString());
+ }
+
+ if (!isSetGatewayId()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'gatewayId' is unset! Struct:" + toString());
+ }
+
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class ExperimentSubmitEventStandardSchemeFactory implements SchemeFactory {
+ public ExperimentSubmitEventStandardScheme getScheme() {
+ return new ExperimentSubmitEventStandardScheme();
+ }
+ }
+
+ private static class ExperimentSubmitEventStandardScheme extends StandardScheme<ExperimentSubmitEvent> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, ExperimentSubmitEvent struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // EXPERIMENT_ID
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.experimentId = iprot.readString();
+ struct.setExperimentIdIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 2: // GATEWAY_ID
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.gatewayId = iprot.readString();
+ struct.setGatewayIdIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, ExperimentSubmitEvent struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.experimentId != null) {
+ oprot.writeFieldBegin(EXPERIMENT_ID_FIELD_DESC);
+ oprot.writeString(struct.experimentId);
+ oprot.writeFieldEnd();
+ }
+ if (struct.gatewayId != null) {
+ oprot.writeFieldBegin(GATEWAY_ID_FIELD_DESC);
+ oprot.writeString(struct.gatewayId);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class ExperimentSubmitEventTupleSchemeFactory implements SchemeFactory {
+ public ExperimentSubmitEventTupleScheme getScheme() {
+ return new ExperimentSubmitEventTupleScheme();
+ }
+ }
+
+ private static class ExperimentSubmitEventTupleScheme extends TupleScheme<ExperimentSubmitEvent> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, ExperimentSubmitEvent struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ oprot.writeString(struct.experimentId);
+ oprot.writeString(struct.gatewayId);
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, ExperimentSubmitEvent struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ struct.experimentId = iprot.readString();
+ struct.setExperimentIdIsSet(true);
+ struct.gatewayId = iprot.readString();
+ struct.setGatewayIdIsSet(true);
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/airavata/blob/4e3dc9a9/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java
index 48df9b4..59b6f33 100644
--- a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java
+++ b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java
@@ -30,12 +30,13 @@ import org.apache.thrift.TEnum;
public enum MessageType implements org.apache.thrift.TEnum {
EXPERIMENT(0),
- TASK(1),
- PROCESS(2),
- JOB(3),
- LAUNCHPROCESS(4),
- TERMINATEPROCESS(5),
- PROCESSOUTPUT(6);
+ EXPERIMENT_CANCEL(1),
+ TASK(2),
+ PROCESS(3),
+ JOB(4),
+ LAUNCHPROCESS(5),
+ TERMINATEPROCESS(6),
+ PROCESSOUTPUT(7);
private final int value;
@@ -59,16 +60,18 @@ public enum MessageType implements org.apache.thrift.TEnum {
case 0:
return EXPERIMENT;
case 1:
- return TASK;
+ return EXPERIMENT_CANCEL;
case 2:
- return PROCESS;
+ return TASK;
case 3:
- return JOB;
+ return PROCESS;
case 4:
- return LAUNCHPROCESS;
+ return JOB;
case 5:
- return TERMINATEPROCESS;
+ return LAUNCHPROCESS;
case 6:
+ return TERMINATEPROCESS;
+ case 7:
return PROCESSOUTPUT;
default:
return null;
http://git-wip-us.apache.org/repos/asf/airavata/blob/4e3dc9a9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java
index 99c11b8..573304a 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java
@@ -22,6 +22,7 @@ package org.apache.airavata.messaging.core;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.messaging.core.impl.ExperimentConsumer;
import org.apache.airavata.messaging.core.impl.ProcessConsumer;
import org.apache.airavata.messaging.core.impl.RabbitMQPublisher;
import org.apache.airavata.messaging.core.impl.RabbitMQSubscriber;
@@ -45,6 +46,10 @@ public class MessagingFactory {
switch (type) {
case EXPERIMENT_LAUNCH:
+ subscriber = getExperimentSubscriber(rProperties);
+ subscriber.listen(((connection, channel) -> new ExperimentConsumer(messageHandler, connection, channel)),
+ null,
+ routingKeys);
break;
case PROCESS_LAUNCH:
subscriber = getProcessSubscriber(rProperties);
@@ -70,6 +75,7 @@ public class MessagingFactory {
Publisher publiser = null;
switch (type) {
case EXPERIMENT_LAUNCH:
+ publiser = getExperimentPublisher(rProperties);
break;
case PROCESS_LAUNCH:
publiser = gerProcessPublisher(rProperties);
@@ -84,6 +90,11 @@ public class MessagingFactory {
return publiser;
}
+ private static Publisher getExperimentPublisher(RabbitMQProperties rProperties) throws AiravataException {
+ rProperties.setExchangeName(ServerSettings.getRabbitmqExperimentExchangeName());
+ return new RabbitMQPublisher(rProperties, messageContext -> rProperties.getExchangeName());
+ }
+
private static Publisher getStatusPublisher(RabbitMQProperties rProperties) throws AiravataException {
rProperties.setExchangeName(ServerSettings.getRabbitmqStatusExchangeName());
return new RabbitMQPublisher(rProperties, MessagingFactory::statusRoutingkey);
@@ -110,7 +121,6 @@ public class MessagingFactory {
return new RabbitMQSubscriber(sp);
}
-
private static RabbitMQSubscriber getProcessSubscriber(RabbitMQProperties sp) throws AiravataException {
sp.setExchangeName(ServerSettings.getRabbitmqProcessExchangeName())
.setQueueName(ServerSettings.getRabbitmqProcessLaunchQueueName())
@@ -119,6 +129,14 @@ public class MessagingFactory {
}
+ private static Subscriber getExperimentSubscriber(RabbitMQProperties sp) throws AiravataException {
+ sp.setExchangeName(ServerSettings.getRabbitmqExperimentExchangeName())
+ .setAutoAck(false);
+ return new RabbitMQSubscriber(sp);
+
+ }
+
+
private static String statusRoutingkey(MessageContext msgCtx) {
String gatewayId = msgCtx.getGatewayId();
String routingKey = null;
http://git-wip-us.apache.org/repos/asf/airavata/blob/4e3dc9a9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java
index 058b99e..6e4c46a 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java
@@ -22,14 +22,37 @@ package org.apache.airavata.messaging.core.impl;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.QueueingConsumer;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.model.messaging.event.ExperimentSubmitEvent;
+import org.apache.airavata.model.messaging.event.Message;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
public class ExperimentConsumer extends QueueingConsumer {
- public ExperimentConsumer(Channel ch) {
- super(ch);
+ private static final Logger log = LoggerFactory.getLogger(ExperimentConsumer.class);
+
+ private MessageHandler handler;
+ private Channel channel;
+ private Connection connection;
+
+ public ExperimentConsumer(MessageHandler messageHandler, Connection connection, Channel channel) {
+ super(channel);
+ this.handler = messageHandler;
+ this.connection = connection;
+ this.channel = channel;
}
@@ -38,5 +61,52 @@ public class ExperimentConsumer extends QueueingConsumer {
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
+
+ Message message = new Message();
+
+ try {
+ ThriftUtils.createThriftFromBytes(body, message);
+ long deliveryTag = envelope.getDeliveryTag();
+ if (message.getMessageType() == MessageType.EXPERIMENT || message.getMessageType() == MessageType.EXPERIMENT_CANCEL) {
+ TBase event = null;
+ String gatewayId = null;
+ ExperimentSubmitEvent experimentEvent = new ExperimentSubmitEvent();
+ ThriftUtils.createThriftFromBytes(message.getEvent(), experimentEvent);
+ log.debug(" Message Received with message id '" + message.getMessageId()
+ + "' and with message type '" + message.getMessageType() + "' for experimentId:" +
+ " " +
+ experimentEvent.getExperimentId());
+ event = experimentEvent;
+ gatewayId = experimentEvent.getGatewayId();
+ MessageContext messageContext = new MessageContext(event, message.getMessageType(),
+ message.getMessageId(), gatewayId, deliveryTag);
+ messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
+ messageContext.setIsRedeliver(envelope.isRedeliver());
+ handler.onMessage(messageContext);
+ } else {
+ log.error("{} message type is not handle in ProcessLaunch Subscriber. Sending ack for " +
+ "delivery tag {} ", message.getMessageType().name(), deliveryTag);
+ sendAck(deliveryTag);
+ }
+ } catch (TException e) {
+ String msg = "Failed to de-serialize the thrift message, from routing keys:" + envelope.getRoutingKey();
+ log.warn(msg, e);
+ }
+
+ }
+
+
+ private void sendAck(long deliveryTag){
+ try {
+ if (channel.isOpen()){
+ channel.basicAck(deliveryTag,false);
+ }else {
+ channel = connection.createChannel();
+ channel.basicQos(ServerSettings.getRabbitmqPrefetchCount());
+ channel.basicAck(deliveryTag, false);
+ }
+ } catch (IOException e) {
+ log.error(e.getMessage(), e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/4e3dc9a9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java
index 368c100..e95a7ca 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java
@@ -48,17 +48,13 @@ public class ProcessConsumer extends QueueingConsumer{
private Connection connection;
public ProcessConsumer(MessageHandler messageHandler, Connection connection, Channel channel){
- this(channel);
+ super(channel);
this.handler = messageHandler;
this.connection = connection;
this.channel = channel;
}
- private ProcessConsumer(Channel ch) {
- super(ch);
- }
-
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
http://git-wip-us.apache.org/repos/asf/airavata/blob/4e3dc9a9/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
index 856f9f3..3438475 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
@@ -96,7 +96,7 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent(processId, gatewayId, experimentId,
tokenId);
MessageContext messageContext = new MessageContext(processSubmitEvent, MessageType.LAUNCHPROCESS, "LAUNCH" +
- ".TASK-" + UUID.randomUUID().toString(), gatewayId);
+ ".PROCESS-" + UUID.randomUUID().toString(), gatewayId);
messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
publisher.publish(messageContext);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/4e3dc9a9/thrift-interface-descriptions/airavata-apis/messaging_events.thrift
----------------------------------------------------------------------
diff --git a/thrift-interface-descriptions/airavata-apis/messaging_events.thrift b/thrift-interface-descriptions/airavata-apis/messaging_events.thrift
index 7ffc88d..2533752 100644
--- a/thrift-interface-descriptions/airavata-apis/messaging_events.thrift
+++ b/thrift-interface-descriptions/airavata-apis/messaging_events.thrift
@@ -36,6 +36,7 @@ enum MessageLevel {
enum MessageType {
EXPERIMENT,
+ EXPERIMENT_CANCEL,
TASK,
PROCESS,
JOB,
@@ -108,6 +109,11 @@ struct JobIdentifier {
// //8:
// }
+struct ExperimentSubmitEvent{
+ 1: required string experimentId,
+ 2: required string gatewayId,
+}
+
struct ProcessSubmitEvent{
1: required string processId,
2: required string gatewayId,