You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/06/04 22:15:36 UTC
[23/81] [abbrv] airavata git commit: Refactored gfac sub modules,
merged gfac-ssh, gfac-gsissh, gfac-local,
gfac-monitor and gsissh modules and create gface-impl,
removed implementation from gfac-core to gfac-impl
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BetterGfacImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BetterGfacImpl.java
new file mode 100644
index 0000000..f4c627f
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BetterGfacImpl.java
@@ -0,0 +1,1151 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.impl;
+
+import org.airavata.appcatalog.cpi.AppCatalog;
+import org.airavata.appcatalog.cpi.AppCatalogException;
+import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
+import org.apache.airavata.common.utils.AiravataZKUtils;
+import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacConfiguration;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.Scheduler;
+import org.apache.airavata.gfac.core.context.ApplicationContext;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.context.MessageContext;
+import org.apache.airavata.gfac.core.GFac;
+import org.apache.airavata.gfac.core.handler.GFacHandler;
+import org.apache.airavata.gfac.core.handler.GFacHandlerConfig;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest;
+import org.apache.airavata.gfac.core.provider.GFacProvider;
+import org.apache.airavata.gfac.core.provider.GFacProviderException;
+import org.apache.airavata.gfac.core.states.GfacExperimentState;
+import org.apache.airavata.gfac.core.states.GfacHandlerState;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
+import org.apache.airavata.model.appcatalog.appinterface.DataType;
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.DataMovementInterface;
+import org.apache.airavata.model.appcatalog.computeresource.FileSystems;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.LOCALSubmission;
+import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
+import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
+import org.apache.airavata.model.messaging.event.JobIdentifier;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskIdentifier;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeRequestEvent;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.model.workspace.experiment.Experiment;
+import org.apache.airavata.model.workspace.experiment.ExperimentState;
+import org.apache.airavata.model.workspace.experiment.ExperimentStatus;
+import org.apache.airavata.model.workspace.experiment.JobDetails;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.TaskState;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ZKPaths;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.net.URL;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * This is the GFac CPI class for external usage, this simply have a single method to submit a job to
+ * the resource, required data for the job has to be stored in registry prior to invoke this object.
+ */
+public class BetterGfacImpl implements GFac {
+ private static final Logger log = LoggerFactory.getLogger(BetterGfacImpl.class);
+ private static String ERROR_SENT = "ErrorSent";
+ private Registry registry;
+ private CuratorFramework curatorClient;
+ private MonitorPublisher monitorPublisher;
+ private static GFac gfacInstance;
+ private boolean initialized = false;
+
+ private BetterGfacImpl() {
+
+ }
+
+ public static GFac getInstance() {
+ if (gfacInstance == null) {
+ synchronized (BetterGfacImpl.class) {
+ if (gfacInstance == null) {
+ gfacInstance = new BetterGfacImpl();
+ }
+ }
+ }
+ return gfacInstance;
+ }
+
+ @Override
+ public boolean init(Registry registry, AppCatalog appCatalog, CuratorFramework curatorClient,
+ MonitorPublisher publisher) {
+ this.registry = registry;
+ monitorPublisher = publisher; // This is a EventBus common for gfac
+ this.curatorClient = curatorClient;
+ return initialized = true;
+ }
+
+
+ /**
+ * This is the job launching method outsiders of GFac can use, this will invoke the GFac handler chain and providers
+ * And update the registry occordingly, so the users can query the database to retrieve status and output from Registry
+ *
+ * @param experimentID
+ * @return
+ * @throws GFacException
+ */
+ @Override
+ public boolean submitJob(String experimentID, String taskID, String gatewayID, String tokenId) throws GFacException {
+ if (!initialized) {
+ throw new GFacException("Initialize the Gfac instance before use it");
+ }
+ JobExecutionContext jobExecutionContext = null;
+ try {
+ jobExecutionContext = createJEC(experimentID, taskID, gatewayID);
+ jobExecutionContext.setCredentialStoreToken(tokenId);
+ return submitJob(jobExecutionContext);
+ } catch (Exception e) {
+ log.error("Error inovoking the job with experiment ID: " + experimentID + ":" + e.getMessage());
+ StringWriter errors = new StringWriter();
+ e.printStackTrace(new PrintWriter(errors));
+ GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ // FIXME: Here we need to update Experiment status to Failed, as we used chained update approach updating
+ // task status will cause to update Experiment status. Remove this chained update approach and fix this correctly (update experiment status)
+ if (jobExecutionContext != null) {
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
+ TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
+ jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+ jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getGatewayID());
+ TaskStatusChangeRequestEvent event = new TaskStatusChangeRequestEvent(TaskState.FAILED, taskIdentity);
+ monitorPublisher.publish(event);
+ }
+ throw new GFacException(e);
+ }
+ }
+
+ private JobExecutionContext createJEC(String experimentID, String taskID, String gatewayID) throws Exception {
+
+ JobExecutionContext jobExecutionContext;
+
+ /** FIXME:
+ * A temporary wrapper to co-relate the app catalog and experiment thrift models to old gfac schema documents.
+ * The serviceName in ExperimentData and service name in ServiceDescriptor has to be same.
+ * 1. Get the Task from the task ID and construct the Job object and save it in to registry
+ * 2. Add properties of description documents to jobExecutionContext which will be used inside the providers.
+ */
+
+ //Fetch the Task details for the requested experimentID from the registry. Extract required pointers from the Task object.
+ TaskDetails taskData = (TaskDetails) registry.get(RegistryModelType.TASK_DETAIL, taskID);
+
+ String applicationInterfaceId = taskData.getApplicationId();
+ String applicationDeploymentId = taskData.getApplicationDeploymentId();
+ if (null == applicationInterfaceId) {
+ throw new GFacException("Error executing the job. The required Application Id is missing");
+ }
+ if (null == applicationDeploymentId) {
+ throw new GFacException("Error executing the job. The required Application deployment Id is missing");
+ }
+
+ AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
+
+ //fetch the compute resource, application interface and deployment information from app catalog
+ ApplicationInterfaceDescription applicationInterface = appCatalog.
+ getApplicationInterface().getApplicationInterface(applicationInterfaceId);
+ ApplicationDeploymentDescription applicationDeployment = appCatalog.
+ getApplicationDeployment().getApplicationDeployement(applicationDeploymentId);
+ ComputeResourceDescription computeResource = appCatalog.getComputeResource().
+ getComputeResource(applicationDeployment.getComputeHostId());
+ ComputeResourcePreference gatewayResourcePreferences = appCatalog.getGatewayProfile().
+ getComputeResourcePreference(gatewayID, applicationDeployment.getComputeHostId());
+ if (gatewayResourcePreferences == null) {
+ List<String> gatewayProfileIds = appCatalog.getGatewayProfile()
+ .getGatewayProfileIds(gatewayID);
+ for (String profileId : gatewayProfileIds) {
+ gatewayID = profileId;
+ gatewayResourcePreferences = appCatalog.getGatewayProfile().
+ getComputeResourcePreference(gatewayID, applicationDeployment.getComputeHostId());
+ if (gatewayResourcePreferences != null) {
+ break;
+ }
+ }
+ }
+
+ URL resource = BetterGfacImpl.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
+ Properties configurationProperties = ServerSettings.getProperties();
+ GFacConfiguration gFacConfiguration = GFacConfiguration.create(new File(resource.getPath()), configurationProperties);
+
+ // start constructing jobexecutioncontext
+ jobExecutionContext = new JobExecutionContext(gFacConfiguration, applicationInterface.getApplicationName());
+
+ // setting experiment/task/workflownode related information
+ Experiment experiment = (Experiment) registry.get(RegistryModelType.EXPERIMENT, experimentID);
+ jobExecutionContext.setExperiment(experiment);
+ jobExecutionContext.setExperimentID(experimentID);
+ jobExecutionContext.setWorkflowNodeDetails(experiment.getWorkflowNodeDetailsList().get(0));
+ jobExecutionContext.setTaskData(taskData);
+ jobExecutionContext.setGatewayID(gatewayID);
+ jobExecutionContext.setAppCatalog(appCatalog);
+
+
+ List<JobDetails> jobDetailsList = taskData.getJobDetailsList();
+ //FIXME: Following for loop only set last jobDetails element to the jobExecutionContext
+ for (JobDetails jDetails : jobDetailsList) {
+ jobExecutionContext.setJobDetails(jDetails);
+ }
+ // setting the registry
+ jobExecutionContext.setRegistry(registry);
+
+ ApplicationContext applicationContext = new ApplicationContext();
+ applicationContext.setComputeResourceDescription(computeResource);
+ applicationContext.setApplicationDeploymentDescription(applicationDeployment);
+ applicationContext.setApplicationInterfaceDescription(applicationInterface);
+ applicationContext.setComputeResourcePreference(gatewayResourcePreferences);
+ jobExecutionContext.setApplicationContext(applicationContext);
+
+
+// List<InputDataObjectType> experimentInputs = experiment.getExperimentInputs();
+// jobExecutionContext.setInMessageContext(new MessageContext(GFacUtils.getInputParamMap(experimentInputs)));
+ List<InputDataObjectType> taskInputs = taskData.getApplicationInputs();
+ jobExecutionContext.setInMessageContext(new MessageContext(GFacUtils.getInputParamMap(taskInputs)));
+
+ jobExecutionContext.setProperty(Constants.PROP_TOPIC, experimentID);
+ jobExecutionContext.setGfac(gfacInstance);
+ jobExecutionContext.setCuratorClient(curatorClient);
+ jobExecutionContext.setMonitorPublisher(monitorPublisher);
+
+ // handle job submission protocol
+ List<JobSubmissionInterface> jobSubmissionInterfaces = computeResource.getJobSubmissionInterfaces();
+ if (jobSubmissionInterfaces != null && !jobSubmissionInterfaces.isEmpty()) {
+ Collections.sort(jobSubmissionInterfaces, new Comparator<JobSubmissionInterface>() {
+ @Override
+ public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) {
+ return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder();
+ }
+ });
+
+ jobExecutionContext.setHostPrioritizedJobSubmissionInterfaces(jobSubmissionInterfaces);
+ } else {
+ throw new GFacException("Compute resource should have at least one job submission interface defined...");
+ }
+ // handle data movement protocol
+ List<DataMovementInterface> dataMovementInterfaces = computeResource.getDataMovementInterfaces();
+ if (dataMovementInterfaces != null && !dataMovementInterfaces.isEmpty()) {
+ Collections.sort(dataMovementInterfaces, new Comparator<DataMovementInterface>() {
+ @Override
+ public int compare(DataMovementInterface dataMovementInterface, DataMovementInterface dataMovementInterface2) {
+ return dataMovementInterface.getPriorityOrder() - dataMovementInterface2.getPriorityOrder();
+ }
+ });
+ jobExecutionContext.setHostPrioritizedDataMovementInterfaces(dataMovementInterfaces);
+ }
+
+ // set compute resource configuration as default preferred values, after that replace those with gateway user preferences.
+ populateDefaultComputeResourceConfiguration(jobExecutionContext, applicationInterface, computeResource);
+ populateResourceJobManager(jobExecutionContext);
+ // if gateway resource preference is set
+ if (gatewayResourcePreferences != null) {
+ if (gatewayResourcePreferences.getScratchLocation() == null) {
+ gatewayResourcePreferences.setScratchLocation("/tmp");
+ }
+ setUpWorkingLocation(jobExecutionContext, applicationInterface, gatewayResourcePreferences.getScratchLocation());
+
+ jobExecutionContext.setPreferredJobSubmissionProtocol(gatewayResourcePreferences.getPreferredJobSubmissionProtocol());
+ if (gatewayResourcePreferences.getPreferredJobSubmissionProtocol() == null) {
+ jobExecutionContext.setPreferredJobSubmissionInterface(jobExecutionContext.getHostPrioritizedJobSubmissionInterfaces().get(0));
+ jobExecutionContext.setPreferredJobSubmissionProtocol(jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionProtocol());
+ } else {
+ for (JobSubmissionInterface jobSubmissionInterface : jobSubmissionInterfaces) {
+ if (gatewayResourcePreferences.getPreferredJobSubmissionProtocol() == jobSubmissionInterface.getJobSubmissionProtocol()) {
+ jobExecutionContext.setPreferredJobSubmissionInterface(jobSubmissionInterface);
+ break;
+ }
+ }
+ }
+
+ if (gatewayResourcePreferences.getLoginUserName() != null) {
+ jobExecutionContext.setLoginUserName(gatewayResourcePreferences.getLoginUserName());
+ }
+
+ // set gatewayUserPreferred data movement protocol and interface
+ jobExecutionContext.setPreferredDataMovementProtocol(gatewayResourcePreferences.getPreferredDataMovementProtocol());
+ if (gatewayResourcePreferences.getPreferredJobSubmissionProtocol() == null) {
+ jobExecutionContext.setPreferredDataMovementInterface(jobExecutionContext.getHostPrioritizedDataMovementInterfaces().get(0));
+ jobExecutionContext.setPreferredDataMovementProtocol(jobExecutionContext.getPreferredDataMovementInterface().getDataMovementProtocol());
+ } else {
+ // this check is to avoid NPE when job submission endpoints do
+ // not contain any data movement interfaces.
+ if ((dataMovementInterfaces != null) && (!dataMovementInterfaces.isEmpty())) {
+ for (DataMovementInterface dataMovementInterface : dataMovementInterfaces) {
+ if (gatewayResourcePreferences.getPreferredDataMovementProtocol() == dataMovementInterface.getDataMovementProtocol()) {
+ jobExecutionContext.setPreferredDataMovementInterface(dataMovementInterface);
+ break;
+ }
+ }
+ }
+ }
+ } else {
+ setUpWorkingLocation(jobExecutionContext, applicationInterface, "/tmp");
+ }
+ List<OutputDataObjectType> taskOutputs = taskData.getApplicationOutputs();
+ if (taskOutputs == null || taskOutputs.isEmpty()) {
+ taskOutputs = applicationInterface.getApplicationOutputs();
+ }
+
+ for (OutputDataObjectType objectType : taskOutputs) {
+ if (objectType.getType() == DataType.URI && objectType.getValue() != null) {
+ String filePath = objectType.getValue();
+ // if output is not in working folder
+ if (objectType.getLocation() != null && !objectType.getLocation().isEmpty()) {
+ if (objectType.getLocation().startsWith(File.separator)) {
+ filePath = objectType.getLocation() + File.separator + filePath;
+ } else {
+ filePath = jobExecutionContext.getOutputDir() + File.separator + objectType.getLocation() + File.separator + filePath;
+ }
+ } else {
+ filePath = jobExecutionContext.getOutputDir() + File.separator + filePath;
+ }
+ objectType.setValue(filePath);
+
+ }
+ if (objectType.getType() == DataType.STDOUT) {
+ objectType.setValue(jobExecutionContext.getOutputDir() + File.separator + jobExecutionContext.getApplicationName() + ".stdout");
+ }
+ if (objectType.getType() == DataType.STDERR) {
+ objectType.setValue(jobExecutionContext.getOutputDir() + File.separator + jobExecutionContext.getApplicationName() + ".stderr");
+ }
+ }
+ jobExecutionContext.setOutMessageContext(new MessageContext(GFacUtils.getOuputParamMap(taskOutputs)));
+ return jobExecutionContext;
+ }
+
+ private void setUpWorkingLocation(JobExecutionContext jobExecutionContext, ApplicationInterfaceDescription applicationInterface, String scratchLocation) {
+ /**
+ * Scratch location
+ */
+ jobExecutionContext.setScratchLocation(scratchLocation);
+
+ /**
+ * Working dir
+ */
+ String workingDir = scratchLocation + File.separator + jobExecutionContext.getExperimentID();
+ jobExecutionContext.setWorkingDir(workingDir);
+
+ /*
+ * Input and Output Directory
+ */
+// jobExecutionContext.setInputDir(workingDir + File.separator + Constants.INPUT_DATA_DIR_VAR_NAME);
+ jobExecutionContext.setInputDir(workingDir);
+// jobExecutionContext.setOutputDir(workingDir + File.separator + Constants.OUTPUT_DATA_DIR_VAR_NAME);
+ jobExecutionContext.setOutputDir(workingDir);
+
+ /*
+ * Stdout and Stderr for Shell
+ */
+ jobExecutionContext.setStandardOutput(workingDir + File.separator + applicationInterface.getApplicationName().replaceAll("\\s+", "") + ".stdout");
+ jobExecutionContext.setStandardError(workingDir + File.separator + applicationInterface.getApplicationName().replaceAll("\\s+", "") + ".stderr");
+ }
+
+ private void populateDefaultComputeResourceConfiguration(JobExecutionContext jobExecutionContext, ApplicationInterfaceDescription applicationInterface, ComputeResourceDescription computeResource) {
+ Map<FileSystems, String> fileSystems = computeResource.getFileSystems();
+ String scratchLocation = fileSystems.get(FileSystems.SCRATCH);
+ if (scratchLocation != null) {
+ setUpWorkingLocation(jobExecutionContext, applicationInterface, scratchLocation);
+ }
+
+ jobExecutionContext.setPreferredJobSubmissionInterface(jobExecutionContext.getHostPrioritizedJobSubmissionInterfaces().get(0));
+ jobExecutionContext.setPreferredJobSubmissionProtocol(jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionProtocol());
+
+ if (jobExecutionContext.getHostPrioritizedDataMovementInterfaces() != null) {
+ jobExecutionContext.setPreferredDataMovementInterface(jobExecutionContext.getHostPrioritizedDataMovementInterfaces().get(0));
+ jobExecutionContext.setPreferredDataMovementProtocol(jobExecutionContext.getPreferredDataMovementInterface().getDataMovementProtocol());
+ }
+ }
+
+ private void populateResourceJobManager(JobExecutionContext jobExecutionContext) {
+ try {
+ JobSubmissionProtocol submissionProtocol = jobExecutionContext.getPreferredJobSubmissionProtocol();
+ JobSubmissionInterface jobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface();
+ if (submissionProtocol == JobSubmissionProtocol.SSH) {
+ SSHJobSubmission sshJobSubmission = GFacUtils.getSSHJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
+ if (sshJobSubmission != null) {
+ jobExecutionContext.setResourceJobManager(sshJobSubmission.getResourceJobManager());
+ }
+ } else if (submissionProtocol == JobSubmissionProtocol.LOCAL) {
+ LOCALSubmission localJobSubmission = GFacUtils.getLocalJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
+ if (localJobSubmission != null) {
+ jobExecutionContext.setResourceJobManager(localJobSubmission.getResourceJobManager());
+ }
+ }
+ } catch (AppCatalogException e) {
+ log.error("Error occured while retrieving job submission interface", e);
+ }
+ }
+
+ private boolean submitJob(JobExecutionContext jobExecutionContext) throws GFacException {
+ // We need to check whether this job is submitted as a part of a large workflow. If yes,
+ // we need to setup workflow tracking listerner.
+ try {
+ GfacExperimentState gfacExpState = GFacUtils.getZKExperimentState(curatorClient, jobExecutionContext); // this is the original state came, if we query again it might be different,so we preserve this state in the environment
+ // Register log event listener. This is required in all scenarios.
+ if (isNewJob(gfacExpState)) {
+ // In this scenario We do everything from the beginning
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+ , GfacExperimentState.ACCEPTED)); // immediately we get the request we update the status
+ launch(jobExecutionContext);
+ } else if (isCompletedJob(gfacExpState)) {
+ log.info("There is nothing to recover in this job so we do not re-submit");
+ ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(),
+ AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID()), true);
+ } else {
+ // Now we know this is an old Job, so we have to handle things gracefully
+ log.info("Re-launching the job in GFac because this is re-submitted to GFac");
+ reLaunch(jobExecutionContext, gfacExpState);
+ }
+ return true;
+ } catch (Exception e) {
+ GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ throw new GFacException("Error launching the Job", e);
+ }
+ }
+
+ private boolean isCompletedJob(GfacExperimentState gfacExpState) {
+ switch (gfacExpState) {
+ case COMPLETED:
+ case FAILED:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ private boolean isNewJob(GfacExperimentState stateVal) {
+ switch (stateVal) {
+ case UNKNOWN:
+ case LAUNCHED:
+ case ACCEPTED:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ @Override
+ public boolean cancel(String experimentID, String taskID, String gatewayID, String tokenId) throws GFacException {
+ if (!initialized) {
+ throw new GFacException("Initialize the Gfac instance before use it");
+ }
+ JobExecutionContext jobExecutionContext = null;
+ try {
+ jobExecutionContext = createJEC(experimentID, taskID, gatewayID);
+ jobExecutionContext.setCredentialStoreToken(tokenId);
+ return cancel(jobExecutionContext);
+ } catch (Exception e) {
+ GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ log.error("Error cancelling the job with experiment ID: " + experimentID);
+ throw new GFacException(e);
+ }
+ }
+
+ private boolean cancel(JobExecutionContext jobExecutionContext) throws GFacException {
+ try {
+ GfacExperimentState gfacExpState = GFacUtils.getZKExperimentState(curatorClient, jobExecutionContext); // this is the original state came, if we query again it might be different,so we preserve this state in the environment
+ String workflowInstanceID = null;
+ if ((workflowInstanceID = (String) jobExecutionContext.getProperty(Constants.PROP_WORKFLOW_INSTANCE_ID)) != null) {
+ //todo implement WorkflowTrackingListener properly
+ }
+ if (gfacExpState == GfacExperimentState.PROVIDERINVOKING || gfacExpState == GfacExperimentState.JOBSUBMITTED
+ || gfacExpState == GfacExperimentState.PROVIDERINVOKED) { // we already have changed registry status, we need to handle job canceling scenario.
+ log.info("Job is in a position to perform a proper cancellation");
+ try {
+ Scheduler.schedule(jobExecutionContext);
+ invokeProviderCancel(jobExecutionContext);
+ } catch (GFacException e) {
+ // we make the experiment as failed due to exception scenario
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
+ jobExecutionContext.setProperty(ERROR_SENT, "true");
+ throw new GFacException(e.getMessage(), e);
+ }
+ }
+// else if (gfacExpState == GfacExperimentState.INHANDLERSINVOKING || gfacExpState == GfacExperimentState.INHANDLERSINVOKED || gfacExpState == GfacExperimentState.OUTHANDLERSINVOKING){
+// log.info("Experiment should be immedietly cancelled");
+// GFacUtils.updateExperimentStatus(jobExecutionContext.getExperimentID(), ExperimentState.CANCELED);
+//
+// }
+ return true;
+ } catch (Exception e) {
+ log.error("Error occured while cancelling job for experiment : " + jobExecutionContext.getExperimentID(), e);
+ throw new GFacException(e.getMessage(), e);
+ }
+ }
+
+ private void reLaunch(JobExecutionContext jobExecutionContext, GfacExperimentState state) throws GFacException {
+ // Scheduler will decide the execution flow of handlers and provider
+ // which handles
+ // the job.
+ String experimentID = jobExecutionContext.getExperimentID();
+ try {
+ Scheduler.schedule(jobExecutionContext);
+
+ // Executing in handlers in the order as they have configured in
+ // GFac configuration
+ // here we do not skip handler if some handler does not have to be
+ // run again during re-run it can implement
+ // that logic in to the handler
+
+ // After executing the in handlers provider instance should be set
+ // to job execution context.
+ // We get the provider instance and execute it.
+ switch (state) {
+ case INHANDLERSINVOKING:
+ reInvokeInFlowHandlers(jobExecutionContext);
+ case INHANDLERSINVOKED:
+ invokeProviderExecute(jobExecutionContext);
+ break;
+ case PROVIDERINVOKING:
+ reInvokeProviderExecute(jobExecutionContext, true);
+ break;
+ case JOBSUBMITTED:
+ reInvokeProviderExecute(jobExecutionContext, false);
+ case PROVIDERINVOKED:
+ // no need to re-run the job
+ log.info("Provider does not have to be recovered because it ran successfully for experiment: " + experimentID);
+ if (!GFacUtils.isSynchronousMode(jobExecutionContext)) {
+ monitorJob(jobExecutionContext);
+ } else {
+ // TODO - Need to handle this correctly , for now we will invoke ouput handlers.
+ invokeOutFlowHandlers(jobExecutionContext);
+ }
+ break;
+ case OUTHANDLERSINVOKING:
+ reInvokeOutFlowHandlers(jobExecutionContext);
+ break;
+ case OUTHANDLERSINVOKED:
+ case COMPLETED:
+ GFacUtils.updateExperimentStatus(jobExecutionContext.getExperimentID(), ExperimentState.COMPLETED);
+ break;
+ case FAILED:
+ GFacUtils.updateExperimentStatus(jobExecutionContext.getExperimentID(), ExperimentState.FAILED);
+ break;
+ case UNKNOWN:
+ log.info("All output handlers are invoked successfully, ExperimentId: " + experimentID + " taskId: " + jobExecutionContext.getTaskData().getTaskID());
+ break;
+ default:
+ throw new GFacException("Un-handled GfacExperimentState : " + state.name());
+ }
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ try {
+ // we make the experiment as failed due to exception scenario
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
+ JobIdentifier jobIdentity = new JobIdentifier(
+ jobExecutionContext.getJobDetails().getJobID(), jobExecutionContext.getTaskData().getTaskID(),
+ jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+ jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getGatewayID());
+ monitorPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity));
+ GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ } catch (NullPointerException e1) {
+ log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, "
+ + "NullPointerException occurred because at this point there might not have Job Created", e1, e);
+ TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
+ jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+ jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getGatewayID());
+ monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity));
+ GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+
+ }
+ jobExecutionContext.setProperty(ERROR_SENT, "true");
+ throw new GFacException(e.getMessage(), e);
+ }
+ }
+
+ private void monitorJob(JobExecutionContext jobExecutionContext) throws GFacException, GFacProviderException {
+ GFacProvider provider = jobExecutionContext.getProvider();
+ if (provider != null) {
+ provider.monitor(jobExecutionContext);
+ }
+ if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
+ invokeOutFlowHandlers(jobExecutionContext);
+ }
+
+ }
+
+ private void launch(JobExecutionContext jobExecutionContext) throws GFacException {
+ // Scheduler will decide the execution flow of handlers and provider
+ // which handles
+ // the job.
+ try {
+ Scheduler.schedule(jobExecutionContext);
+
+ // Executing in handlers in the order as they have configured in
+ // GFac configuration
+ // here we do not skip handler if some handler does not have to be
+ // run again during re-run it can implement
+ // that logic in to the handler
+ if (!isCancelling(jobExecutionContext)) {
+ invokeInFlowHandlers(jobExecutionContext); // to keep the
+ // consistency we always
+ // try to re-run to
+ // avoid complexity
+ } else {
+ log.info("Experiment is cancelled, so launch operation is stopping immediately");
+ GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED);
+ return; // if the job is cancelled, status change is handled in cancel operation this thread simply has to be returned
+ }
+ // if (experimentID != null){
+ // registry2.changeStatus(jobExecutionContext.getExperimentID(),AiravataJobState.State.INHANDLERSDONE);
+ // }
+
+ // After executing the in handlers provider instance should be set
+ // to job execution context.
+ // We get the provider instance and execute it.
+ if (!isCancelling(jobExecutionContext)) {
+ invokeProviderExecute(jobExecutionContext);
+ } else {
+ log.info("Experiment is cancelled, so launch operation is stopping immediately");
+ GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED);
+ return;
+ }
+ } catch (Exception e) {
+ try {
+ // we make the experiment as failed due to exception scenario
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
+ // monitorPublisher.publish(new
+ // ExperimentStatusChangedEvent(new
+ // ExperimentIdentity(jobExecutionContext.getExperimentID()),
+ // ExperimentState.FAILED));
+ // Updating the task status if there's any task associated
+ // monitorPublisher.publish(new TaskStatusChangeRequest(
+ // new TaskIdentity(jobExecutionContext.getExperimentID(),
+ // jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+ // jobExecutionContext.getTaskData().getTaskID()),
+ // TaskState.FAILED
+ // ));
+ JobIdentifier jobIdentity = new JobIdentifier(
+ jobExecutionContext.getJobDetails().getJobID(), jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+ jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getGatewayID());
+ monitorPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity));
+ } catch (NullPointerException e1) {
+ log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, "
+ + "NullPointerException occurred because at this point there might not have Job Created", e1, e);
+ //monitorPublisher.publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), ExperimentState.FAILED));
+ // Updating the task status if there's any task associated
+ TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
+ jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+ jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getGatewayID());
+ monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity));
+
+ }
+ jobExecutionContext.setProperty(ERROR_SENT, "true");
+ throw new GFacException(e.getMessage(), e);
+ }
+ }
+
+ private void invokeProviderExecute(JobExecutionContext jobExecutionContext) throws Exception {
+ GFacProvider provider = jobExecutionContext.getProvider();
+ if (provider != null) {
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
+ GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, provider.getClass().getName());
+ initProvider(provider, jobExecutionContext);
+ executeProvider(provider, jobExecutionContext);
+ disposeProvider(provider, jobExecutionContext);
+ GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
+ }
+ if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
+ invokeOutFlowHandlers(jobExecutionContext);
+ }
+ }
+
+ private void reInvokeProviderExecute(JobExecutionContext jobExecutionContext, boolean submit) throws Exception {
+ GFacProvider provider = jobExecutionContext.getProvider();
+ if (provider != null) {
+ if (submit) {
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
+ GfacHandlerState plState = GFacUtils.getHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName());
+ GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, provider.getClass().getName());
+ if (plState != null && plState == GfacHandlerState.INVOKING) { // this will make sure if a plugin crashes it will not launch from the scratch, but plugins have to save their invoked state
+ initProvider(provider, jobExecutionContext);
+ executeProvider(provider, jobExecutionContext);
+ disposeProvider(provider, jobExecutionContext);
+ } else {
+ provider.recover(jobExecutionContext);
+ }
+ GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
+ } else {
+ disposeProvider(provider, jobExecutionContext);
+ GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
+ }
+ }
+
+ if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
+ invokeOutFlowHandlers(jobExecutionContext);
+ }
+
+ }
+
+ private boolean invokeProviderCancel(JobExecutionContext jobExecutionContext) throws GFacException {
+ GFacProvider provider = jobExecutionContext.getProvider();
+ if (provider != null) {
+ initProvider(provider, jobExecutionContext);
+ cancelProvider(provider, jobExecutionContext);
+ disposeProvider(provider, jobExecutionContext);
+ }
+ if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
+ invokeOutFlowHandlers(jobExecutionContext);
+ }
+ return true;
+ }
+
+ // TODO - Did refactoring, but need to recheck the logic again.
+ private void reInvokeProviderCancel(JobExecutionContext jobExecutionContext) throws Exception {
+ GFacProvider provider = jobExecutionContext.getProvider();
+ if (provider != null) {
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
+ GfacHandlerState plState = GFacUtils.getHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName());
+ GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, provider.getClass().getName());
+ if (plState == GfacHandlerState.UNKNOWN || plState == GfacHandlerState.INVOKING) { // this will make sure if a plugin crashes it will not launch from the scratch, but plugins have to save their invoked state
+ initProvider(provider, jobExecutionContext);
+ cancelProvider(provider, jobExecutionContext);
+ disposeProvider(provider, jobExecutionContext);
+ } else {
+ provider.recover(jobExecutionContext);
+ }
+ GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
+ }
+
+ if (GFacUtils.isSynchronousMode(jobExecutionContext))
+
+ {
+ invokeOutFlowHandlers(jobExecutionContext);
+ }
+
+ }
+
+
+ private void initProvider(GFacProvider provider, JobExecutionContext jobExecutionContext) throws GFacException {
+ try {
+ provider.initialize(jobExecutionContext);
+ } catch (Exception e) {
+ throw new GFacException("Error while initializing provider " + provider.getClass().getName() + ".", e);
+ }
+ }
+
+ private void executeProvider(GFacProvider provider, JobExecutionContext jobExecutionContext) throws GFacException {
+ try {
+ provider.execute(jobExecutionContext);
+ } catch (Exception e) {
+ throw new GFacException("Error while executing provider " + provider.getClass().getName() + " functionality.", e);
+ }
+ }
+
+ private boolean cancelProvider(GFacProvider provider, JobExecutionContext jobExecutionContext) throws GFacException {
+ try {
+ return provider.cancelJob(jobExecutionContext);
+ } catch (Exception e) {
+ throw new GFacException("Error while executing provider " + provider.getClass().getName() + " functionality.", e);
+ }
+ }
+
+ private void disposeProvider(GFacProvider provider, JobExecutionContext jobExecutionContext) throws GFacException {
+ try {
+ provider.dispose(jobExecutionContext);
+ } catch (Exception e) {
+ throw new GFacException("Error while invoking provider " + provider.getClass().getName() + " dispose method.", e);
+ }
+ }
+
+// private void registerWorkflowTrackingListener(String workflowInstanceID, JobExecutionContext jobExecutionContext) {
+// String workflowNodeID = (String) jobExecutionContext.getProperty(Constants.PROP_WORKFLOW_NODE_ID);
+// String topic = (String) jobExecutionContext.getProperty(Constants.PROP_TOPIC);
+// String brokerUrl = (String) jobExecutionContext.getProperty(Constants.PROP_BROKER_URL);
+// jobExecutionContext.getNotificationService().registerListener(
+// new WorkflowTrackingListener(workflowInstanceID, workflowNodeID, brokerUrl, topic));
+//
+// }
+
+ private void invokeInFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+ List<GFacHandlerConfig> handlers = jobExecutionContext.getGFacConfiguration().getInHandlers();
+ try {
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+ , GfacExperimentState.INHANDLERSINVOKING));
+ for (GFacHandlerConfig handlerClassName : handlers) {
+ if (!isCancelling(jobExecutionContext)) {
+ Class<? extends GFacHandler> handlerClass;
+ GFacHandler handler;
+ try {
+ GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, handlerClassName.getClassName());
+ handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
+ handler = handlerClass.newInstance();
+ handler.initProperties(handlerClassName.getProperties());
+ } catch (ClassNotFoundException e) {
+ throw new GFacException("Cannot load handler class " + handlerClassName, e);
+ } catch (InstantiationException e) {
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ } catch (IllegalAccessException e) {
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ }
+ try {
+ handler.invoke(jobExecutionContext);
+ GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
+ // if exception thrown before that we do not make it finished
+ } catch (GFacHandlerException e) {
+ throw new GFacException("Error Executing a InFlow Handler", e.getCause());
+ }
+ } else {
+ log.info("Experiment execution is cancelled, so InHandler invocation is going to stop");
+ GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED);
+ break;
+ }
+ }
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+ , GfacExperimentState.INHANDLERSINVOKED));
+ } catch (Exception e) {
+ throw new GFacException("Error Invoking Handlers:" + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+ if (!initialized) {
+ throw new GFacException("Initialize the Gfac instance before use it");
+ }
+ String experimentPath = null;
+ try {
+ experimentPath = AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID());
+ if (curatorClient.checkExists().forPath(experimentPath) == null) {
+ log.error("Experiment is already finalized so no output handlers will be invoked");
+ return;
+ }
+ GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
+ List<GFacHandlerConfig> handlers = null;
+ if (gFacConfiguration != null) {
+ handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
+ } else {
+ try {
+ jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getGatewayID());
+ } catch (Exception e) {
+ log.error("Error constructing job execution context during outhandler invocation");
+ throw new GFacException(e);
+ }
+ }
+ try {
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
+ for (GFacHandlerConfig handlerClassName : handlers) {
+ if (!isCancel(jobExecutionContext)) {
+ Class<? extends GFacHandler> handlerClass;
+ GFacHandler handler;
+ try {
+ GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, handlerClassName.getClassName());
+ handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
+ handler = handlerClass.newInstance();
+ handler.initProperties(handlerClassName.getProperties());
+ } catch (ClassNotFoundException e) {
+ log.error(e.getMessage());
+ throw new GFacException("Cannot load handler class " + handlerClassName, e);
+ } catch (InstantiationException | IllegalAccessException e) {
+ log.error(e.getMessage());
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ }
+ try {
+ handler.invoke(jobExecutionContext);
+ GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
+ } catch (Exception e) {
+ GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.FAILED);
+ try {
+ StringWriter errors = new StringWriter();
+ e.printStackTrace(new PrintWriter(errors));
+ GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ } catch (GFacException e1) {
+ log.error(e1.getLocalizedMessage());
+ }
+ throw new GFacException(e);
+ }
+ } else {
+ log.info("Experiment execution is cancelled, so OutHandler invocation is stopped");
+ if (isCancelling(jobExecutionContext)) {
+ GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED);
+ }
+ break;
+ }
+ }
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
+ } catch (Exception e) {
+ throw new GFacException("Cannot invoke OutHandlers\n" + e.getMessage(), e);
+ }
+ } catch (Exception e) {
+ throw new GFacException("Cannot invoke OutHandlers\n" + e.getMessage(), e);
+ }
+
+ // At this point all the execution is finished so we update the task and experiment statuses.
+ // Handler authors does not have to worry about updating experiment or task statuses.
+// monitorPublisher.publish(new
+// ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
+// ExperimentState.COMPLETED));
+ // Updating the task status if there's any task associated
+ TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
+ jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+ jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getGatewayID());
+ monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.COMPLETED, taskIdentity));
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED));
+
+ }
+
+ /**
+ * If handlers ran successfully we re-run only recoverable handlers
+ * If handler never ran we run the normal invoke method
+ *
+ * @param jobExecutionContext
+ * @throws GFacException
+ */
+ // TODO - Did refactoring, but need to recheck the logic again.
+ private void reInvokeInFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+ List<GFacHandlerConfig> handlers = jobExecutionContext.getGFacConfiguration().getInHandlers();
+ try {
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+ , GfacExperimentState.INHANDLERSINVOKING));
+ for (GFacHandlerConfig handlerClassName : handlers) {
+ Class<? extends GFacHandler> handlerClass;
+ GFacHandler handler;
+ try {
+ handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
+ handler = handlerClass.newInstance();
+ GfacHandlerState plState = GFacUtils.getHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName());
+ GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.INVOKING);
+ handler.initProperties(handlerClassName.getProperties());
+ if (plState == GfacHandlerState.UNKNOWN || plState == GfacHandlerState.INVOKING) {
+ log.info(handlerClassName.getClassName() + " never ran so we run this is normal mode");
+ handler.invoke(jobExecutionContext);
+ } else {
+ // if these already ran we re-run only recoverable handlers
+ log.info(handlerClassName.getClassName() + " is a recoverable handler so we recover the handler");
+ handler.recover(jobExecutionContext);
+ }
+ GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
+ } catch (GFacHandlerException e) {
+ throw new GFacException("Error Executing a InFlow Handler", e.getCause());
+ } catch (ClassNotFoundException e) {
+ throw new GFacException("Cannot load handler class " + handlerClassName, e);
+ } catch (InstantiationException e) {
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ } catch (IllegalAccessException e) {
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ }
+ }
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+ , GfacExperimentState.INHANDLERSINVOKED));
+ } catch (Exception e) {
+ try {
+ StringWriter errors = new StringWriter();
+ e.printStackTrace(new PrintWriter(errors));
+ GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ } catch (GFacException e1) {
+ log.error(e1.getLocalizedMessage());
+ }
+ throw new GFacException("Error while re-invoking output handlers", e);
+ }
+ }
+
+ // TODO - Did refactoring, but need to recheck the logic again.
+ @Override
+ public void reInvokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+ if (!initialized) {
+ throw new GFacException("Initialize the Gfac instance before use it");
+ }
+ GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
+ List<GFacHandlerConfig> handlers = null;
+ if (gFacConfiguration != null) {
+ handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
+ } else {
+ try {
+ jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getGatewayID());
+ } catch (Exception e) {
+ log.error("Error constructing job execution context during outhandler invocation");
+ throw new GFacException(e);
+ }
+ launch(jobExecutionContext);
+ }
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
+ for (GFacHandlerConfig handlerClassName : handlers) {
+ Class<? extends GFacHandler> handlerClass;
+ GFacHandler handler;
+ try {
+ handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
+ handler = handlerClass.newInstance();
+ GfacHandlerState plState = GFacUtils.getHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName());
+ GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.INVOKING);
+ if (plState == GfacHandlerState.UNKNOWN || plState == GfacHandlerState.INVOKING) {
+ log.info(handlerClassName.getClassName() + " never ran so we run this in normal mode");
+ handler.initProperties(handlerClassName.getProperties());
+ handler.invoke(jobExecutionContext);
+ } else {
+ // if these already ran we re-run only recoverable handlers
+ handler.recover(jobExecutionContext);
+ }
+ GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
+ } catch (ClassNotFoundException e) {
+ try {
+ StringWriter errors = new StringWriter();
+ e.printStackTrace(new PrintWriter(errors));
+ GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ } catch (GFacException e1) {
+ log.error(e1.getLocalizedMessage());
+ }
+ log.error(e.getMessage());
+ throw new GFacException("Cannot load handler class " + handlerClassName, e);
+ } catch (InstantiationException e) {
+ try {
+ StringWriter errors = new StringWriter();
+ e.printStackTrace(new PrintWriter(errors));
+ GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ } catch (GFacException e1) {
+ log.error(e1.getLocalizedMessage());
+ }
+ log.error(e.getMessage());
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ } catch (IllegalAccessException e) {
+ try {
+ StringWriter errors = new StringWriter();
+ e.printStackTrace(new PrintWriter(errors));
+ GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ } catch (GFacException e1) {
+ log.error(e1.getLocalizedMessage());
+ }
+ log.error(e.getMessage());
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ } catch (Exception e) {
+ // TODO: Better error reporting.
+ try {
+ StringWriter errors = new StringWriter();
+ e.printStackTrace(new PrintWriter(errors));
+ GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ } catch (GFacException e1) {
+ log.error(e1.getLocalizedMessage());
+ }
+ throw new GFacException("Error Executing a OutFlow Handler", e);
+ }
+ }
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
+
+ // At this point all the execution is finished so we update the task and experiment statuses.
+ // Handler authors does not have to worry about updating experiment or task statuses.
+// monitorPublisher.publish(new
+// ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
+// ExperimentState.COMPLETED));
+ // Updating the task status if there's any task associated
+
+ TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
+ jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+ jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getGatewayID());
+ monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.COMPLETED, taskIdentity));
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED));
+ }
+
+ private boolean isCancelled(JobExecutionContext executionContext) {
+ // we should check whether experiment is cancelled using registry
+ try {
+ ExperimentStatus status = (ExperimentStatus) registry.get(RegistryModelType.EXPERIMENT_STATUS, executionContext.getExperimentID());
+ if (status != null) {
+ ExperimentState experimentState = status.getExperimentState();
+ if (experimentState != null) {
+ if (experimentState == ExperimentState.CANCELED) {
+ return true;
+ }
+ }
+ }
+ } catch (RegistryException e) {
+ // on error we return false.
+ }
+ return false;
+ }
+
+ private boolean isCancelling(JobExecutionContext executionContext) {
+ // check whether cancelling request came
+ try {
+ ExperimentStatus status = (ExperimentStatus) registry.get(RegistryModelType.EXPERIMENT_STATUS, executionContext.getExperimentID());
+ if (status != null) {
+ ExperimentState experimentState = status.getExperimentState();
+ if (experimentState != null) {
+ if (experimentState == ExperimentState.CANCELING) {
+ return true;
+ }
+ }
+ }
+ } catch (RegistryException e) {
+ // on error we return false;
+ }
+ return false;
+ }
+
+ private boolean isCancel(JobExecutionContext jobExecutionContext) {
+ try {
+ ExperimentStatus status = (ExperimentStatus) registry.get(RegistryModelType.EXPERIMENT_STATUS, jobExecutionContext.getExperimentID());
+ if (status != null) {
+ ExperimentState experimentState = status.getExperimentState();
+ if (experimentState != null) {
+ if (experimentState == ExperimentState.CANCELING || experimentState == ExperimentState.CANCELED) {
+ return true;
+ }
+ }
+ }
+ } catch (RegistryException e) {
+ // on error we return false;
+ }
+ return false;
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GfacInternalStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GfacInternalStatusUpdator.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GfacInternalStatusUpdator.java
new file mode 100644
index 0000000..a45eb23
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GfacInternalStatusUpdator.java
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.impl;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.airavata.common.utils.AiravataZKUtils;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.listener.AbstractActivityListener;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest;
+import org.apache.airavata.gfac.core.states.GfacExperimentState;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+public class GfacInternalStatusUpdator implements AbstractActivityListener, Watcher {
+ private final static Logger logger = LoggerFactory.getLogger(GfacInternalStatusUpdator.class);
+
+ private CuratorFramework curatorClient;
+
+ private static Integer mutex = -1;
+
+ @Subscribe
+ public void updateZK(GfacExperimentStateChangeRequest statusChangeRequest) throws Exception {
+ logger.info("Gfac internal state changed to: " + statusChangeRequest.getState().toString());
+ MonitorID monitorID = statusChangeRequest.getMonitorID();
+ String experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
+ String experimentPath = experimentNode + File.separator + ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME)
+ + File.separator + statusChangeRequest.getMonitorID().getExperimentID();
+ Stat exists = null;
+ if(!(GfacExperimentState.COMPLETED.equals(statusChangeRequest.getState()) || GfacExperimentState.FAILED.equals(statusChangeRequest.getState()))) {
+ exists = curatorClient.checkExists().forPath(experimentPath);
+ if (exists == null) {
+ logger.error("ZK path: " + experimentPath + " does not exists !!");
+ return;
+ }
+ Stat state = curatorClient.checkExists().forPath(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
+ if (state == null) {
+ // state znode has to be created
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).
+ forPath(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
+ String.valueOf(statusChangeRequest.getState().getValue()).getBytes());
+ } else {
+ curatorClient.setData().withVersion(state.getVersion()).forPath(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
+ String.valueOf(statusChangeRequest.getState().getValue()).getBytes());
+ }
+ }
+ switch (statusChangeRequest.getState()) {
+ case COMPLETED:
+ logger.info("Experiment Completed, So removing the ZK entry for the experiment" + monitorID.getExperimentID());
+ logger.info("Zookeeper experiment Path: " + experimentPath);
+ break;
+ case FAILED:
+ logger.info("Experiment Failed, So removing the ZK entry for the experiment" + monitorID.getExperimentID());
+ logger.info("Zookeeper experiment Path: " + experimentPath);
+ break;
+ default:
+ }
+ }
+
+ public void setup(Object... configurations) {
+ for (Object configuration : configurations) {
+ if (configuration instanceof CuratorFramework) {
+ this.curatorClient = (CuratorFramework) configuration;
+ }
+ }
+ }
+
+ public void process(WatchedEvent watchedEvent) {
+ logger.info(watchedEvent.getPath());
+ synchronized (mutex) {
+ Event.KeeperState state = watchedEvent.getState();
+ if (state == Event.KeeperState.SyncConnected) {
+ mutex.notify();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/InputHandlerWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/InputHandlerWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/InputHandlerWorker.java
new file mode 100644
index 0000000..461cc1e
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/InputHandlerWorker.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.impl;
+
+import org.apache.airavata.gfac.core.GFac;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InputHandlerWorker implements Runnable {
+ private static Logger log = LoggerFactory.getLogger(InputHandlerWorker.class);
+
+ String experimentId;
+ String taskId;
+ String gatewayId;
+ String tokenId;
+
+ GFac gfac;
+ public InputHandlerWorker(GFac gfac, String experimentId,String taskId,String gatewayId, String tokenId) {
+ this.gfac = gfac;
+ this.experimentId = experimentId;
+ this.taskId = taskId;
+ this.gatewayId = gatewayId;
+ this.tokenId = tokenId;
+ }
+
+ @Override
+ public void run() {
+ try {
+ gfac.submitJob(experimentId, taskId, gatewayId, tokenId);
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutHandlerWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutHandlerWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutHandlerWorker.java
new file mode 100644
index 0000000..5adaed6
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutHandlerWorker.java
@@ -0,0 +1,88 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.impl;
+
+import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.GFac;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.model.messaging.event.TaskIdentifier;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeRequestEvent;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.model.workspace.experiment.TaskState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+public class OutHandlerWorker implements Runnable {
+ private final static Logger logger = LoggerFactory.getLogger(OutHandlerWorker.class);
+
+ private GFac gfac;
+
+ private MonitorID monitorID;
+
+ private MonitorPublisher monitorPublisher;
+ private JobExecutionContext jEC;
+
+ public OutHandlerWorker(GFac gfac, MonitorID monitorID,MonitorPublisher monitorPublisher) {
+ this.gfac = gfac;
+ this.monitorID = monitorID;
+ this.monitorPublisher = monitorPublisher;
+ this.jEC = monitorID.getJobExecutionContext();
+ }
+
+ public OutHandlerWorker(JobExecutionContext jEC) {
+ this.jEC = jEC;
+ this.gfac = jEC.getGfac();
+ this.monitorPublisher = jEC.getMonitorPublisher();
+ }
+
+ @Override
+ public void run() {
+ try {
+// gfac.invokeOutFlowHandlers(monitorID.getJobExecutionContext());
+ gfac.invokeOutFlowHandlers(jEC);
+ } catch (Exception e) {
+ logger.error(e.getMessage(),e);
+ TaskIdentifier taskIdentifier = new TaskIdentifier(monitorID.getTaskID(), monitorID.getWorkflowNodeID(),monitorID.getExperimentID(), monitorID.getJobExecutionContext().getGatewayID());
+ //FIXME this is a case where the output retrieving fails even if the job execution was a success. Thus updating the task status
+ monitorPublisher.publish(new TaskStatusChangeRequestEvent(TaskState.FAILED, taskIdentifier));
+ try {
+ StringWriter errors = new StringWriter();
+ e.printStackTrace(new PrintWriter(errors));
+ GFacUtils.saveErrorDetails(monitorID.getJobExecutionContext(), errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ } catch (GFacException e1) {
+ logger.error("Error while persisting error details", e);
+ }
+ logger.info(e.getLocalizedMessage(), e);
+ // Save error details to registry
+
+ }
+// monitorPublisher.publish(monitorID.getStatus());
+ monitorPublisher.publish(jEC.getJobDetails().getJobStatus());
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutputUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutputUtils.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutputUtils.java
new file mode 100644
index 0000000..81bcf66
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutputUtils.java
@@ -0,0 +1,111 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.impl;
+
+import org.apache.airavata.common.utils.StringUtil;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.model.appcatalog.appinterface.DataType;
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class OutputUtils {
+ private static String regexPattern = "\\s*=\\s*(.*)\\r?\\n";
+
+ public static void fillOutputFromStdout(Map<String, Object> output, String stdout, String stderr, List<OutputDataObjectType> outputArray) throws Exception {
+ // this is no longer correct
+// if (stdout == null || stdout.equals("")) {
+// throw new GFacHandlerException("Standard output is empty.");
+// }
+
+ Set<String> keys = output.keySet();
+ OutputDataObjectType actual = null;
+ OutputDataObjectType resultOutput = null;
+ for (String paramName : keys) {
+ actual = (OutputDataObjectType) output.get(paramName);
+ // if parameter value is not already set, we let it go
+
+ if (actual == null) {
+ continue;
+ }
+ resultOutput = new OutputDataObjectType();
+ if (DataType.STDOUT == actual.getType()) {
+ actual.setValue(stdout);
+ resultOutput.setName(paramName);
+ resultOutput.setType(DataType.STDOUT);
+ resultOutput.setValue(stdout);
+ outputArray.add(resultOutput);
+ } else if (DataType.STDERR == actual.getType()) {
+ actual.setValue(stderr);
+ resultOutput.setName(paramName);
+ resultOutput.setType(DataType.STDERR);
+ resultOutput.setValue(stderr);
+ outputArray.add(resultOutput);
+ }
+// else if ("URI".equals(actual.getType().getType().toString())) {
+// continue;
+// }
+ else {
+ String parseStdout = parseStdout(stdout, paramName);
+ if (parseStdout != null) {
+ actual.setValue(parseStdout);
+ resultOutput.setName(paramName);
+ resultOutput.setType(DataType.STRING);
+ resultOutput.setValue(parseStdout);
+ outputArray.add(resultOutput);
+ }
+ }
+ }
+ }
+
+ private static String parseStdout(String stdout, String outParam) throws Exception {
+ String regex = Pattern.quote(outParam) + regexPattern;
+ String match = null;
+ Pattern pattern = Pattern.compile(regex);
+ Matcher matcher = pattern.matcher(stdout);
+ while (matcher.find()) {
+ match = matcher.group(1);
+ }
+ if (match != null) {
+ match = match.trim();
+ return match;
+ }
+ return null;
+ }
+
+ public static String[] parseStdoutArray(String stdout, String outParam) throws Exception {
+ String regex = Pattern.quote(outParam) + regexPattern;
+ StringBuffer match = new StringBuffer();
+ Pattern pattern = Pattern.compile(regex);
+ Matcher matcher = pattern.matcher(stdout);
+ while (matcher.find()) {
+ match.append(matcher.group(1) + StringUtil.DELIMETER);
+ }
+ if (match != null && match.length() >0) {
+ return StringUtil.getElementsFromString(match.toString());
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/handler/LocalDirectorySetupHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/handler/LocalDirectorySetupHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/handler/LocalDirectorySetupHandler.java
new file mode 100644
index 0000000..2f9e3b0
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/handler/LocalDirectorySetupHandler.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local.handler;
+
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.handler.GFacHandler;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Properties;
+
+public class LocalDirectorySetupHandler implements GFacHandler {
+ private static final Logger log = LoggerFactory.getLogger(LocalDirectorySetupHandler.class);
+
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ log.info("Invoking LocalDirectorySetupHandler ...");
+ log.debug("working directory = " + jobExecutionContext.getWorkingDir());
+ log.debug("temp directory = " + jobExecutionContext.getWorkingDir());
+
+ makeFileSystemDir(jobExecutionContext.getWorkingDir());
+ makeFileSystemDir(jobExecutionContext.getInputDir());
+ makeFileSystemDir(jobExecutionContext.getOutputDir());
+ }
+
+ @Override
+ public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ // TODO: Auto generated method body.
+ }
+
+ private void makeFileSystemDir(String dir) throws GFacHandlerException {
+ File f = new File(dir);
+ if (f.isDirectory() && f.exists()) {
+ return;
+ } else if (!new File(dir).mkdir()) {
+ throw new GFacHandlerException("Cannot create directory " + dir);
+ }
+ }
+
+ public void initProperties(Properties properties) throws GFacHandlerException {
+
+ }
+}