You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/06/03 20:14:49 UTC

[32/39] airavata git commit: Refactored gfac sub modules, merged gfac-ssh, gfac-gsissh, gfac-local, gfac-monitor and gsissh modules and create gface-impl, removed implementation from gfac-core to gfac-impl

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
deleted file mode 100644
index 2cc375e..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ /dev/null
@@ -1,1158 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.core.cpi;
-
-import org.airavata.appcatalog.cpi.AppCatalog;
-import org.airavata.appcatalog.cpi.AppCatalogException;
-import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
-import org.apache.airavata.common.utils.AiravataZKUtils;
-import org.apache.airavata.common.utils.MonitorPublisher;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.gfac.Constants;
-import org.apache.airavata.gfac.GFacConfiguration;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.Scheduler;
-import org.apache.airavata.gfac.core.context.ApplicationContext;
-import org.apache.airavata.gfac.core.context.JobExecutionContext;
-import org.apache.airavata.gfac.core.context.MessageContext;
-import org.apache.airavata.gfac.core.handler.GFacHandler;
-import org.apache.airavata.gfac.core.handler.GFacHandlerConfig;
-import org.apache.airavata.gfac.core.handler.GFacHandlerException;
-import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest;
-import org.apache.airavata.gfac.core.notification.events.ExecutionFailEvent;
-import org.apache.airavata.gfac.core.notification.listeners.LoggingListener;
-import org.apache.airavata.gfac.core.provider.GFacProvider;
-import org.apache.airavata.gfac.core.provider.GFacProviderException;
-import org.apache.airavata.gfac.core.states.GfacExperimentState;
-import org.apache.airavata.gfac.core.states.GfacHandlerState;
-import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
-import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
-import org.apache.airavata.model.appcatalog.appinterface.DataType;
-import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
-import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
-import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
-import org.apache.airavata.model.appcatalog.computeresource.DataMovementInterface;
-import org.apache.airavata.model.appcatalog.computeresource.FileSystems;
-import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
-import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
-import org.apache.airavata.model.appcatalog.computeresource.LOCALSubmission;
-import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
-import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
-import org.apache.airavata.model.messaging.event.JobIdentifier;
-import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
-import org.apache.airavata.model.messaging.event.TaskIdentifier;
-import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
-import org.apache.airavata.model.messaging.event.TaskStatusChangeRequestEvent;
-import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
-import org.apache.airavata.model.workspace.experiment.ErrorCategory;
-import org.apache.airavata.model.workspace.experiment.Experiment;
-import org.apache.airavata.model.workspace.experiment.ExperimentState;
-import org.apache.airavata.model.workspace.experiment.ExperimentStatus;
-import org.apache.airavata.model.workspace.experiment.JobDetails;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.model.workspace.experiment.TaskDetails;
-import org.apache.airavata.model.workspace.experiment.TaskState;
-import org.apache.airavata.registry.cpi.Registry;
-import org.apache.airavata.registry.cpi.RegistryException;
-import org.apache.airavata.registry.cpi.RegistryModelType;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.utils.ZKPaths;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.net.URL;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * This is the GFac CPI class for external usage, this simply have a single method to submit a job to
- * the resource, required data for the job has to be stored in registry prior to invoke this object.
- */
-public class BetterGfacImpl implements GFac {
-    private static final Logger log = LoggerFactory.getLogger(BetterGfacImpl.class);
-    private static String ERROR_SENT = "ErrorSent";
-    private Registry registry;
-    private CuratorFramework curatorClient;
-    private MonitorPublisher monitorPublisher;
-    private static GFac gfacInstance;
-    private boolean initialized = false;
-
-    private BetterGfacImpl() {
-
-    }
-
-    public static GFac getInstance() {
-        if (gfacInstance == null) {
-            synchronized (BetterGfacImpl.class) {
-                if (gfacInstance == null) {
-                    gfacInstance = new BetterGfacImpl();
-                }
-            }
-        }
-        return gfacInstance;
-    }
-
-    @Override
-    public boolean init(Registry registry, AppCatalog appCatalog, CuratorFramework curatorClient,
-                        MonitorPublisher publisher) {
-        this.registry = registry;
-        monitorPublisher = publisher;     // This is a EventBus common for gfac
-        this.curatorClient = curatorClient;
-        return initialized = true;
-    }
-
-
-    /**
-     * This is the job launching method outsiders of GFac can use, this will invoke the GFac handler chain and providers
-     * And update the registry occordingly, so the users can query the database to retrieve status and output from Registry
-     *
-     * @param experimentID
-     * @return
-     * @throws GFacException
-     */
-    @Override
-    public boolean submitJob(String experimentID, String taskID, String gatewayID, String tokenId) throws GFacException {
-        if (!initialized) {
-            throw new GFacException("Initialize the Gfac instance before use it");
-        }
-        JobExecutionContext jobExecutionContext = null;
-        try {
-            jobExecutionContext = createJEC(experimentID, taskID, gatewayID);
-            jobExecutionContext.setCredentialStoreToken(tokenId);
-            return submitJob(jobExecutionContext);
-        } catch (Exception e) {
-            log.error("Error inovoking the job with experiment ID: " + experimentID + ":" + e.getMessage());
-            StringWriter errors = new StringWriter();
-            e.printStackTrace(new PrintWriter(errors));
-            GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
-            // FIXME: Here we need to update Experiment status to Failed, as we used chained update approach updating
-            // task status will cause to update Experiment status. Remove this chained update approach and fix this correctly (update experiment status)
-            if (jobExecutionContext != null) {
-                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
-                TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
-                        jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-                        jobExecutionContext.getExperimentID(),
-                        jobExecutionContext.getGatewayID());
-                TaskStatusChangeRequestEvent event = new TaskStatusChangeRequestEvent(TaskState.FAILED, taskIdentity);
-                monitorPublisher.publish(event);
-            }
-            throw new GFacException(e);
-        }
-    }
-
-    private JobExecutionContext createJEC(String experimentID, String taskID, String gatewayID) throws Exception {
-
-        JobExecutionContext jobExecutionContext;
-
-        /** FIXME:
-         * A temporary wrapper to co-relate the app catalog and experiment thrift models to old gfac schema documents.
-         * The serviceName in ExperimentData and service name in ServiceDescriptor has to be same.
-         * 1. Get the Task from the task ID and construct the Job object and save it in to registry
-         * 2. Add properties of description documents to jobExecutionContext which will be used inside the providers.
-         */
-
-        //Fetch the Task details for the requested experimentID from the registry. Extract required pointers from the Task object.
-        TaskDetails taskData = (TaskDetails) registry.get(RegistryModelType.TASK_DETAIL, taskID);
-
-        String applicationInterfaceId = taskData.getApplicationId();
-        String applicationDeploymentId = taskData.getApplicationDeploymentId();
-        if (null == applicationInterfaceId) {
-            throw new GFacException("Error executing the job. The required Application Id is missing");
-        }
-        if (null == applicationDeploymentId) {
-            throw new GFacException("Error executing the job. The required Application deployment Id is missing");
-        }
-
-        AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
-
-        //fetch the compute resource, application interface and deployment information from app catalog
-        ApplicationInterfaceDescription applicationInterface = appCatalog.
-                getApplicationInterface().getApplicationInterface(applicationInterfaceId);
-        ApplicationDeploymentDescription applicationDeployment = appCatalog.
-                getApplicationDeployment().getApplicationDeployement(applicationDeploymentId);
-        ComputeResourceDescription computeResource = appCatalog.getComputeResource().
-                getComputeResource(applicationDeployment.getComputeHostId());
-        ComputeResourcePreference gatewayResourcePreferences = appCatalog.getGatewayProfile().
-                getComputeResourcePreference(gatewayID, applicationDeployment.getComputeHostId());
-        if (gatewayResourcePreferences == null) {
-            List<String> gatewayProfileIds = appCatalog.getGatewayProfile()
-                    .getGatewayProfileIds(gatewayID);
-            for (String profileId : gatewayProfileIds) {
-                gatewayID = profileId;
-                gatewayResourcePreferences = appCatalog.getGatewayProfile().
-                        getComputeResourcePreference(gatewayID, applicationDeployment.getComputeHostId());
-                if (gatewayResourcePreferences != null) {
-                    break;
-                }
-            }
-        }
-
-        URL resource = BetterGfacImpl.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
-        Properties configurationProperties = ServerSettings.getProperties();
-        GFacConfiguration gFacConfiguration = GFacConfiguration.create(new File(resource.getPath()), configurationProperties);
-
-        // start constructing jobexecutioncontext
-        jobExecutionContext = new JobExecutionContext(gFacConfiguration, applicationInterface.getApplicationName());
-
-        // setting experiment/task/workflownode related information
-        Experiment experiment = (Experiment) registry.get(RegistryModelType.EXPERIMENT, experimentID);
-        jobExecutionContext.setExperiment(experiment);
-        jobExecutionContext.setExperimentID(experimentID);
-        jobExecutionContext.setWorkflowNodeDetails(experiment.getWorkflowNodeDetailsList().get(0));
-        jobExecutionContext.setTaskData(taskData);
-        jobExecutionContext.setGatewayID(gatewayID);
-        jobExecutionContext.setAppCatalog(appCatalog);
-
-
-        List<JobDetails> jobDetailsList = taskData.getJobDetailsList();
-        //FIXME: Following for loop only set last jobDetails element to the jobExecutionContext
-        for (JobDetails jDetails : jobDetailsList) {
-            jobExecutionContext.setJobDetails(jDetails);
-        }
-        // setting the registry
-        jobExecutionContext.setRegistry(registry);
-
-        ApplicationContext applicationContext = new ApplicationContext();
-        applicationContext.setComputeResourceDescription(computeResource);
-        applicationContext.setApplicationDeploymentDescription(applicationDeployment);
-        applicationContext.setApplicationInterfaceDescription(applicationInterface);
-        applicationContext.setComputeResourcePreference(gatewayResourcePreferences);
-        jobExecutionContext.setApplicationContext(applicationContext);
-
-
-//        List<InputDataObjectType> experimentInputs = experiment.getExperimentInputs();
-//        jobExecutionContext.setInMessageContext(new MessageContext(GFacUtils.getInputParamMap(experimentInputs)));
-        List<InputDataObjectType> taskInputs = taskData.getApplicationInputs();
-        jobExecutionContext.setInMessageContext(new MessageContext(GFacUtils.getInputParamMap(taskInputs)));
-
-        jobExecutionContext.setProperty(Constants.PROP_TOPIC, experimentID);
-        jobExecutionContext.setGfac(gfacInstance);
-        jobExecutionContext.setCuratorClient(curatorClient);
-        jobExecutionContext.setMonitorPublisher(monitorPublisher);
-
-        // handle job submission protocol
-        List<JobSubmissionInterface> jobSubmissionInterfaces = computeResource.getJobSubmissionInterfaces();
-        if (jobSubmissionInterfaces != null && !jobSubmissionInterfaces.isEmpty()) {
-            Collections.sort(jobSubmissionInterfaces, new Comparator<JobSubmissionInterface>() {
-                @Override
-                public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) {
-                    return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder();
-                }
-            });
-
-            jobExecutionContext.setHostPrioritizedJobSubmissionInterfaces(jobSubmissionInterfaces);
-        } else {
-            throw new GFacException("Compute resource should have at least one job submission interface defined...");
-        }
-        // handle data movement protocol
-        List<DataMovementInterface> dataMovementInterfaces = computeResource.getDataMovementInterfaces();
-        if (dataMovementInterfaces != null && !dataMovementInterfaces.isEmpty()) {
-            Collections.sort(dataMovementInterfaces, new Comparator<DataMovementInterface>() {
-                @Override
-                public int compare(DataMovementInterface dataMovementInterface, DataMovementInterface dataMovementInterface2) {
-                    return dataMovementInterface.getPriorityOrder() - dataMovementInterface2.getPriorityOrder();
-                }
-            });
-            jobExecutionContext.setHostPrioritizedDataMovementInterfaces(dataMovementInterfaces);
-        }
-
-        // set compute resource configuration as default preferred values, after that replace those with gateway user preferences.
-        populateDefaultComputeResourceConfiguration(jobExecutionContext, applicationInterface, computeResource);
-        populateResourceJobManager(jobExecutionContext);
-        // if gateway resource preference is set
-        if (gatewayResourcePreferences != null) {
-            if (gatewayResourcePreferences.getScratchLocation() == null) {
-                gatewayResourcePreferences.setScratchLocation("/tmp");
-            }
-            setUpWorkingLocation(jobExecutionContext, applicationInterface, gatewayResourcePreferences.getScratchLocation());
-
-            jobExecutionContext.setPreferredJobSubmissionProtocol(gatewayResourcePreferences.getPreferredJobSubmissionProtocol());
-            if (gatewayResourcePreferences.getPreferredJobSubmissionProtocol() == null) {
-                jobExecutionContext.setPreferredJobSubmissionInterface(jobExecutionContext.getHostPrioritizedJobSubmissionInterfaces().get(0));
-                jobExecutionContext.setPreferredJobSubmissionProtocol(jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionProtocol());
-            } else {
-                for (JobSubmissionInterface jobSubmissionInterface : jobSubmissionInterfaces) {
-                    if (gatewayResourcePreferences.getPreferredJobSubmissionProtocol() == jobSubmissionInterface.getJobSubmissionProtocol()) {
-                        jobExecutionContext.setPreferredJobSubmissionInterface(jobSubmissionInterface);
-                        break;
-                    }
-                }
-            }
-
-            if (gatewayResourcePreferences.getLoginUserName() != null) {
-                jobExecutionContext.setLoginUserName(gatewayResourcePreferences.getLoginUserName());
-            }
-
-            // set gatewayUserPreferred data movement protocol and interface
-            jobExecutionContext.setPreferredDataMovementProtocol(gatewayResourcePreferences.getPreferredDataMovementProtocol());
-            if (gatewayResourcePreferences.getPreferredJobSubmissionProtocol() == null) {
-                jobExecutionContext.setPreferredDataMovementInterface(jobExecutionContext.getHostPrioritizedDataMovementInterfaces().get(0));
-                jobExecutionContext.setPreferredDataMovementProtocol(jobExecutionContext.getPreferredDataMovementInterface().getDataMovementProtocol());
-            } else {
-                // this check is to avoid NPE when job submission endpoints do
-                // not contain any data movement interfaces.
-                if ((dataMovementInterfaces != null) && (!dataMovementInterfaces.isEmpty())) {
-                    for (DataMovementInterface dataMovementInterface : dataMovementInterfaces) {
-                        if (gatewayResourcePreferences.getPreferredDataMovementProtocol() == dataMovementInterface.getDataMovementProtocol()) {
-                            jobExecutionContext.setPreferredDataMovementInterface(dataMovementInterface);
-                            break;
-                        }
-                    }
-                }
-            }
-        } else {
-            setUpWorkingLocation(jobExecutionContext, applicationInterface, "/tmp");
-        }
-        List<OutputDataObjectType> taskOutputs = taskData.getApplicationOutputs();
-        if (taskOutputs == null || taskOutputs.isEmpty()) {
-            taskOutputs = applicationInterface.getApplicationOutputs();
-        }
-
-        for (OutputDataObjectType objectType : taskOutputs) {
-            if (objectType.getType() == DataType.URI && objectType.getValue() != null) {
-                String filePath = objectType.getValue();
-                // if output is not in working folder
-                if (objectType.getLocation() != null && !objectType.getLocation().isEmpty()) {
-                    if (objectType.getLocation().startsWith(File.separator)) {
-                        filePath = objectType.getLocation() + File.separator + filePath;
-                    } else {
-                        filePath = jobExecutionContext.getOutputDir() + File.separator + objectType.getLocation() + File.separator + filePath;
-                    }
-                } else {
-                    filePath = jobExecutionContext.getOutputDir() + File.separator + filePath;
-                }
-                objectType.setValue(filePath);
-
-            }
-            if (objectType.getType() == DataType.STDOUT) {
-                objectType.setValue(jobExecutionContext.getOutputDir() + File.separator + jobExecutionContext.getApplicationName() + ".stdout");
-            }
-            if (objectType.getType() == DataType.STDERR) {
-                objectType.setValue(jobExecutionContext.getOutputDir() + File.separator + jobExecutionContext.getApplicationName() + ".stderr");
-            }
-        }
-        jobExecutionContext.setOutMessageContext(new MessageContext(GFacUtils.getOuputParamMap(taskOutputs)));
-        return jobExecutionContext;
-    }
-
-    private void setUpWorkingLocation(JobExecutionContext jobExecutionContext, ApplicationInterfaceDescription applicationInterface, String scratchLocation) {
-        /**
-         * Scratch location
-         */
-        jobExecutionContext.setScratchLocation(scratchLocation);
-
-        /**
-         * Working dir
-         */
-        String workingDir = scratchLocation + File.separator + jobExecutionContext.getExperimentID();
-        jobExecutionContext.setWorkingDir(workingDir);
-
-            /*
-            * Input and Output Directory
-            */
-//        jobExecutionContext.setInputDir(workingDir + File.separator + Constants.INPUT_DATA_DIR_VAR_NAME);
-        jobExecutionContext.setInputDir(workingDir);
-//        jobExecutionContext.setOutputDir(workingDir + File.separator + Constants.OUTPUT_DATA_DIR_VAR_NAME);
-        jobExecutionContext.setOutputDir(workingDir);
-
-            /*
-            * Stdout and Stderr for Shell
-            */
-        jobExecutionContext.setStandardOutput(workingDir + File.separator + applicationInterface.getApplicationName().replaceAll("\\s+", "") + ".stdout");
-        jobExecutionContext.setStandardError(workingDir + File.separator + applicationInterface.getApplicationName().replaceAll("\\s+", "") + ".stderr");
-    }
-
-    private void populateDefaultComputeResourceConfiguration(JobExecutionContext jobExecutionContext, ApplicationInterfaceDescription applicationInterface, ComputeResourceDescription computeResource) {
-        Map<FileSystems, String> fileSystems = computeResource.getFileSystems();
-        String scratchLocation = fileSystems.get(FileSystems.SCRATCH);
-        if (scratchLocation != null) {
-            setUpWorkingLocation(jobExecutionContext, applicationInterface, scratchLocation);
-        }
-
-        jobExecutionContext.setPreferredJobSubmissionInterface(jobExecutionContext.getHostPrioritizedJobSubmissionInterfaces().get(0));
-        jobExecutionContext.setPreferredJobSubmissionProtocol(jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionProtocol());
-
-        if (jobExecutionContext.getHostPrioritizedDataMovementInterfaces() != null) {
-            jobExecutionContext.setPreferredDataMovementInterface(jobExecutionContext.getHostPrioritizedDataMovementInterfaces().get(0));
-            jobExecutionContext.setPreferredDataMovementProtocol(jobExecutionContext.getPreferredDataMovementInterface().getDataMovementProtocol());
-        }
-    }
-
-    private void populateResourceJobManager(JobExecutionContext jobExecutionContext) {
-        try {
-            JobSubmissionProtocol submissionProtocol = jobExecutionContext.getPreferredJobSubmissionProtocol();
-            JobSubmissionInterface jobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface();
-            if (submissionProtocol == JobSubmissionProtocol.SSH) {
-                SSHJobSubmission sshJobSubmission = GFacUtils.getSSHJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
-                if (sshJobSubmission != null) {
-                    jobExecutionContext.setResourceJobManager(sshJobSubmission.getResourceJobManager());
-                }
-            } else if (submissionProtocol == JobSubmissionProtocol.LOCAL) {
-                LOCALSubmission localJobSubmission = GFacUtils.getLocalJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
-                if (localJobSubmission != null) {
-                    jobExecutionContext.setResourceJobManager(localJobSubmission.getResourceJobManager());
-                }
-            }
-        } catch (AppCatalogException e) {
-            log.error("Error occured while retrieving job submission interface", e);
-        }
-    }
-
-    private boolean submitJob(JobExecutionContext jobExecutionContext) throws GFacException {
-        // We need to check whether this job is submitted as a part of a large workflow. If yes,
-        // we need to setup workflow tracking listerner.
-        try {
-            GfacExperimentState gfacExpState = GFacUtils.getZKExperimentState(curatorClient, jobExecutionContext);   // this is the original state came, if we query again it might be different,so we preserve this state in the environment
-            // Register log event listener. This is required in all scenarios.
-            if (isNewJob(gfacExpState)) {
-                // In this scenario We do everything from the beginning
-                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
-                        , GfacExperimentState.ACCEPTED));                  // immediately we get the request we update the status
-                launch(jobExecutionContext);
-            } else if (isCompletedJob(gfacExpState)) {
-                log.info("There is nothing to recover in this job so we do not re-submit");
-                ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(),
-                        AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID()), true);
-            } else {
-                // Now we know this is an old Job, so we have to handle things gracefully
-                log.info("Re-launching the job in GFac because this is re-submitted to GFac");
-                reLaunch(jobExecutionContext, gfacExpState);
-            }
-            return true;
-        } catch (Exception e) {
-            GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
-            throw new GFacException("Error launching the Job", e);
-        }
-    }
-
-    private boolean isCompletedJob(GfacExperimentState gfacExpState) {
-        switch (gfacExpState) {
-            case COMPLETED:
-            case FAILED:
-                return true;
-            default:
-                return false;
-        }
-    }
-
-    private boolean isNewJob(GfacExperimentState stateVal) {
-        switch (stateVal) {
-            case UNKNOWN:
-            case LAUNCHED:
-            case ACCEPTED:
-                return true;
-            default:
-                return false;
-        }
-    }
-
-    @Override
-    public boolean cancel(String experimentID, String taskID, String gatewayID, String tokenId) throws GFacException {
-        if (!initialized) {
-            throw new GFacException("Initialize the Gfac instance before use it");
-        }
-        JobExecutionContext jobExecutionContext = null;
-        try {
-            jobExecutionContext = createJEC(experimentID, taskID, gatewayID);
-            jobExecutionContext.setCredentialStoreToken(tokenId);
-            return cancel(jobExecutionContext);
-        } catch (Exception e) {
-            GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
-            log.error("Error cancelling the job with experiment ID: " + experimentID);
-            throw new GFacException(e);
-        }
-    }
-
-    private boolean cancel(JobExecutionContext jobExecutionContext) throws GFacException {
-        try {
-            GfacExperimentState gfacExpState = GFacUtils.getZKExperimentState(curatorClient, jobExecutionContext);   // this is the original state came, if we query again it might be different,so we preserve this state in the environment
-            String workflowInstanceID = null;
-            if ((workflowInstanceID = (String) jobExecutionContext.getProperty(Constants.PROP_WORKFLOW_INSTANCE_ID)) != null) {
-                //todo implement WorkflowTrackingListener properly
-            }
-            // Register log event listener. This is required in all scenarios.
-            jobExecutionContext.getNotificationService().registerListener(new LoggingListener());
-
-            if (gfacExpState == GfacExperimentState.PROVIDERINVOKING || gfacExpState == GfacExperimentState.JOBSUBMITTED
-                    || gfacExpState == GfacExperimentState.PROVIDERINVOKED) { // we already have changed registry status, we need to handle job canceling scenario.
-                log.info("Job is in a position to perform a proper cancellation");
-                try {
-                    Scheduler.schedule(jobExecutionContext);
-                    invokeProviderCancel(jobExecutionContext);
-                } catch (GFacException e) {
-                    // we make the experiment as failed due to exception scenario
-                    monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
-                    jobExecutionContext.setProperty(ERROR_SENT, "true");
-                    jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
-                    throw new GFacException(e.getMessage(), e);
-                }
-            }
-//            else if (gfacExpState == GfacExperimentState.INHANDLERSINVOKING || gfacExpState == GfacExperimentState.INHANDLERSINVOKED || gfacExpState == GfacExperimentState.OUTHANDLERSINVOKING){
-//                log.info("Experiment should be immedietly cancelled");
-//                GFacUtils.updateExperimentStatus(jobExecutionContext.getExperimentID(), ExperimentState.CANCELED);
-//
-//            }
-            return true;
-        } catch (Exception e) {
-            log.error("Error occured while cancelling job for experiment : " + jobExecutionContext.getExperimentID(), e);
-            throw new GFacException(e.getMessage(), e);
-        }
-    }
-
-    private void reLaunch(JobExecutionContext jobExecutionContext, GfacExperimentState state) throws GFacException {
-        // Scheduler will decide the execution flow of handlers and provider
-        // which handles
-        // the job.
-        String experimentID = jobExecutionContext.getExperimentID();
-        try {
-            Scheduler.schedule(jobExecutionContext);
-
-            // Executing in handlers in the order as they have configured in
-            // GFac configuration
-            // here we do not skip handler if some handler does not have to be
-            // run again during re-run it can implement
-            // that logic in to the handler
-
-            // After executing the in handlers provider instance should be set
-            // to job execution context.
-            // We get the provider instance and execute it.
-            switch (state) {
-                case INHANDLERSINVOKING:
-                    reInvokeInFlowHandlers(jobExecutionContext);
-                case INHANDLERSINVOKED:
-                    invokeProviderExecute(jobExecutionContext);
-                    break;
-                case PROVIDERINVOKING:
-                    reInvokeProviderExecute(jobExecutionContext, true);
-                    break;
-                case JOBSUBMITTED:
-                    reInvokeProviderExecute(jobExecutionContext, false);
-                case PROVIDERINVOKED:
-                    // no need to re-run the job
-                    log.info("Provider does not have to be recovered because it ran successfully for experiment: " + experimentID);
-                    if (!GFacUtils.isSynchronousMode(jobExecutionContext)) {
-                        monitorJob(jobExecutionContext);
-                    } else {
-                        // TODO - Need to handle this correctly , for now we will invoke ouput handlers.
-                        invokeOutFlowHandlers(jobExecutionContext);
-                    }
-                    break;
-                case OUTHANDLERSINVOKING:
-                    reInvokeOutFlowHandlers(jobExecutionContext);
-                    break;
-                case OUTHANDLERSINVOKED:
-                case COMPLETED:
-                    GFacUtils.updateExperimentStatus(jobExecutionContext.getExperimentID(), ExperimentState.COMPLETED);
-                    break;
-                case FAILED:
-                    GFacUtils.updateExperimentStatus(jobExecutionContext.getExperimentID(), ExperimentState.FAILED);
-                    break;
-                case UNKNOWN:
-                    log.info("All output handlers are invoked successfully, ExperimentId: " + experimentID + " taskId: " + jobExecutionContext.getTaskData().getTaskID());
-                    break;
-                default:
-                    throw new GFacException("Un-handled GfacExperimentState : " + state.name());
-            }
-        } catch (Exception e) {
-            log.error(e.getMessage(), e);
-            try {
-                // we make the experiment as failed due to exception scenario
-                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
-                JobIdentifier jobIdentity = new JobIdentifier(
-                        jobExecutionContext.getJobDetails().getJobID(), jobExecutionContext.getTaskData().getTaskID(),
-                        jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-                        jobExecutionContext.getExperimentID(),
-                        jobExecutionContext.getGatewayID());
-                monitorPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity));
-                GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
-            } catch (NullPointerException e1) {
-                log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, "
-                        + "NullPointerException occurred because at this point there might not have Job Created", e1, e);
-                TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
-                        jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-                        jobExecutionContext.getExperimentID(),
-                        jobExecutionContext.getGatewayID());
-                monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity));
-                GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
-
-            }
-            jobExecutionContext.setProperty(ERROR_SENT, "true");
-            jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
-            throw new GFacException(e.getMessage(), e);
-        }
-    }
-
-    private void monitorJob(JobExecutionContext jobExecutionContext) throws GFacException, GFacProviderException {
-        GFacProvider provider = jobExecutionContext.getProvider();
-        if (provider != null) {
-            provider.monitor(jobExecutionContext);
-        }
-        if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
-            invokeOutFlowHandlers(jobExecutionContext);
-        }
-
-    }
-
-    private void launch(JobExecutionContext jobExecutionContext) throws GFacException {
-        // Scheduler will decide the execution flow of handlers and provider
-        // which handles
-        // the job.
-        try {
-            Scheduler.schedule(jobExecutionContext);
-
-            // Executing in handlers in the order as they have configured in
-            // GFac configuration
-            // here we do not skip handler if some handler does not have to be
-            // run again during re-run it can implement
-            // that logic in to the handler
-            if (!isCancelling(jobExecutionContext)) {
-                invokeInFlowHandlers(jobExecutionContext); // to keep the
-                // consistency we always
-                // try to re-run to
-                // avoid complexity
-            } else {
-                log.info("Experiment is cancelled, so launch operation is stopping immediately");
-                GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED);
-                return; // if the job is cancelled, status change is handled in cancel operation this thread simply has to be returned
-            }
-            // if (experimentID != null){
-            // registry2.changeStatus(jobExecutionContext.getExperimentID(),AiravataJobState.State.INHANDLERSDONE);
-            // }
-
-            // After executing the in handlers provider instance should be set
-            // to job execution context.
-            // We get the provider instance and execute it.
-            if (!isCancelling(jobExecutionContext)) {
-                invokeProviderExecute(jobExecutionContext);
-            } else {
-                log.info("Experiment is cancelled, so launch operation is stopping immediately");
-                GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED);
-                return;
-            }
-        } catch (Exception e) {
-            try {
-                // we make the experiment as failed due to exception scenario
-                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
-                // monitorPublisher.publish(new
-                // ExperimentStatusChangedEvent(new
-                // ExperimentIdentity(jobExecutionContext.getExperimentID()),
-                // ExperimentState.FAILED));
-                // Updating the task status if there's any task associated
-                // monitorPublisher.publish(new TaskStatusChangeRequest(
-                // new TaskIdentity(jobExecutionContext.getExperimentID(),
-                // jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-                // jobExecutionContext.getTaskData().getTaskID()),
-                // TaskState.FAILED
-                // ));
-                JobIdentifier jobIdentity = new JobIdentifier(
-                        jobExecutionContext.getJobDetails().getJobID(), jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-                        jobExecutionContext.getExperimentID(),
-                        jobExecutionContext.getGatewayID());
-                monitorPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity));
-            } catch (NullPointerException e1) {
-                log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, "
-                        + "NullPointerException occurred because at this point there might not have Job Created", e1, e);
-                //monitorPublisher.publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), ExperimentState.FAILED));
-                // Updating the task status if there's any task associated
-                TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
-                        jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-                        jobExecutionContext.getExperimentID(),
-                        jobExecutionContext.getGatewayID());
-                monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity));
-
-            }
-            jobExecutionContext.setProperty(ERROR_SENT, "true");
-            jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
-            throw new GFacException(e.getMessage(), e);
-        }
-    }
-
-    private void invokeProviderExecute(JobExecutionContext jobExecutionContext) throws Exception {
-        GFacProvider provider = jobExecutionContext.getProvider();
-        if (provider != null) {
-            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
-            GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, provider.getClass().getName());
-            initProvider(provider, jobExecutionContext);
-            executeProvider(provider, jobExecutionContext);
-            disposeProvider(provider, jobExecutionContext);
-            GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
-            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
-        }
-        if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
-            invokeOutFlowHandlers(jobExecutionContext);
-        }
-    }
-
-    private void reInvokeProviderExecute(JobExecutionContext jobExecutionContext, boolean submit) throws Exception {
-        GFacProvider provider = jobExecutionContext.getProvider();
-        if (provider != null) {
-            if (submit) {
-                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
-                GfacHandlerState plState = GFacUtils.getHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName());
-                GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, provider.getClass().getName());
-                if (plState != null && plState == GfacHandlerState.INVOKING) {    // this will make sure if a plugin crashes it will not launch from the scratch, but plugins have to save their invoked state
-                    initProvider(provider, jobExecutionContext);
-                    executeProvider(provider, jobExecutionContext);
-                    disposeProvider(provider, jobExecutionContext);
-                } else {
-                    provider.recover(jobExecutionContext);
-                }
-                GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
-                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
-            } else {
-                disposeProvider(provider, jobExecutionContext);
-                GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
-                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
-            }
-        }
-
-        if (GFacUtils.isSynchronousMode(jobExecutionContext))  {
-            invokeOutFlowHandlers(jobExecutionContext);
-        }
-
-    }
-
-    private boolean invokeProviderCancel(JobExecutionContext jobExecutionContext) throws GFacException {
-        GFacProvider provider = jobExecutionContext.getProvider();
-        if (provider != null) {
-            initProvider(provider, jobExecutionContext);
-            cancelProvider(provider, jobExecutionContext);
-            disposeProvider(provider, jobExecutionContext);
-        }
-        if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
-            invokeOutFlowHandlers(jobExecutionContext);
-        }
-        return true;
-    }
-
-    // TODO - Did refactoring, but need to recheck the logic again.
-    private void reInvokeProviderCancel(JobExecutionContext jobExecutionContext) throws Exception {
-        GFacProvider provider = jobExecutionContext.getProvider();
-        if (provider != null) {
-            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
-            GfacHandlerState plState = GFacUtils.getHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName());
-            GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, provider.getClass().getName());
-            if (plState == GfacHandlerState.UNKNOWN || plState == GfacHandlerState.INVOKING) {    // this will make sure if a plugin crashes it will not launch from the scratch, but plugins have to save their invoked state
-                initProvider(provider, jobExecutionContext);
-                cancelProvider(provider, jobExecutionContext);
-                disposeProvider(provider, jobExecutionContext);
-            } else {
-                provider.recover(jobExecutionContext);
-            }
-            GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
-            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
-        }
-
-        if (GFacUtils.isSynchronousMode(jobExecutionContext))
-
-        {
-            invokeOutFlowHandlers(jobExecutionContext);
-        }
-
-    }
-
-
-    private void initProvider(GFacProvider provider, JobExecutionContext jobExecutionContext) throws GFacException {
-        try {
-            provider.initialize(jobExecutionContext);
-        } catch (Exception e) {
-            throw new GFacException("Error while initializing provider " + provider.getClass().getName() + ".", e);
-        }
-    }
-
-    private void executeProvider(GFacProvider provider, JobExecutionContext jobExecutionContext) throws GFacException {
-        try {
-            provider.execute(jobExecutionContext);
-        } catch (Exception e) {
-            throw new GFacException("Error while executing provider " + provider.getClass().getName() + " functionality.", e);
-        }
-    }
-
-    private boolean cancelProvider(GFacProvider provider, JobExecutionContext jobExecutionContext) throws GFacException {
-        try {
-            return provider.cancelJob(jobExecutionContext);
-        } catch (Exception e) {
-            throw new GFacException("Error while executing provider " + provider.getClass().getName() + " functionality.", e);
-        }
-    }
-
-    private void disposeProvider(GFacProvider provider, JobExecutionContext jobExecutionContext) throws GFacException {
-        try {
-            provider.dispose(jobExecutionContext);
-        } catch (Exception e) {
-            throw new GFacException("Error while invoking provider " + provider.getClass().getName() + " dispose method.", e);
-        }
-    }
-
-//    private void registerWorkflowTrackingListener(String workflowInstanceID, JobExecutionContext jobExecutionContext) {
-//        String workflowNodeID = (String) jobExecutionContext.getProperty(Constants.PROP_WORKFLOW_NODE_ID);
-//        String topic = (String) jobExecutionContext.getProperty(Constants.PROP_TOPIC);
-//        String brokerUrl = (String) jobExecutionContext.getProperty(Constants.PROP_BROKER_URL);
-//        jobExecutionContext.getNotificationService().registerListener(
-//                new WorkflowTrackingListener(workflowInstanceID, workflowNodeID, brokerUrl, topic));
-//
-//    }
-
-    private void invokeInFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
-        List<GFacHandlerConfig> handlers = jobExecutionContext.getGFacConfiguration().getInHandlers();
-        try {
-            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
-                    , GfacExperimentState.INHANDLERSINVOKING));
-            for (GFacHandlerConfig handlerClassName : handlers) {
-                if (!isCancelling(jobExecutionContext)) {
-                    Class<? extends GFacHandler> handlerClass;
-                    GFacHandler handler;
-                    try {
-                        GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, handlerClassName.getClassName());
-                        handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
-                        handler = handlerClass.newInstance();
-                        handler.initProperties(handlerClassName.getProperties());
-                    } catch (ClassNotFoundException e) {
-                        throw new GFacException("Cannot load handler class " + handlerClassName, e);
-                    } catch (InstantiationException e) {
-                        throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
-                    } catch (IllegalAccessException e) {
-                        throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
-                    }
-                    try {
-                        handler.invoke(jobExecutionContext);
-                        GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
-                        // if exception thrown before that we do not make it finished
-                    } catch (GFacHandlerException e) {
-                        throw new GFacException("Error Executing a InFlow Handler", e.getCause());
-                    }
-                } else {
-                    log.info("Experiment execution is cancelled, so InHandler invocation is going to stop");
-                    GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED);
-                    break;
-                }
-            }
-            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
-                    , GfacExperimentState.INHANDLERSINVOKED));
-        } catch (Exception e) {
-            throw new GFacException("Error Invoking Handlers:" + e.getMessage(), e);
-        }
-    }
-
-    @Override
-    public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
-        if (!initialized) {
-            throw new GFacException("Initialize the Gfac instance before use it");
-        }
-        String experimentPath = null;
-        try {
-            experimentPath = AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID());
-            if (curatorClient.checkExists().forPath(experimentPath) == null) {
-                log.error("Experiment is already finalized so no output handlers will be invoked");
-                return;
-            }
-            GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
-            List<GFacHandlerConfig> handlers = null;
-            if (gFacConfiguration != null) {
-                handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
-            } else {
-                try {
-                    jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(),
-                            jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getGatewayID());
-                } catch (Exception e) {
-                    log.error("Error constructing job execution context during outhandler invocation");
-                    throw new GFacException(e);
-                }
-            }
-            try {
-                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
-                for (GFacHandlerConfig handlerClassName : handlers) {
-                    if (!isCancel(jobExecutionContext)) {
-                        Class<? extends GFacHandler> handlerClass;
-                        GFacHandler handler;
-                        try {
-                            GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, handlerClassName.getClassName());
-                            handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
-                            handler = handlerClass.newInstance();
-                            handler.initProperties(handlerClassName.getProperties());
-                        } catch (ClassNotFoundException e) {
-                            log.error(e.getMessage());
-                            throw new GFacException("Cannot load handler class " + handlerClassName, e);
-                        } catch (InstantiationException | IllegalAccessException e) {
-                            log.error(e.getMessage());
-                            throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
-                        }
-                        try {
-                            handler.invoke(jobExecutionContext);
-                            GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
-                        } catch (Exception e) {
-                            GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.FAILED);
-                            try {
-                                StringWriter errors = new StringWriter();
-                                e.printStackTrace(new PrintWriter(errors));
-                                GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
-                            } catch (GFacException e1) {
-                                log.error(e1.getLocalizedMessage());
-                            }
-                            throw new GFacException(e);
-                        }
-                    } else {
-                        log.info("Experiment execution is cancelled, so OutHandler invocation is stopped");
-                        if (isCancelling(jobExecutionContext)) {
-                            GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED);
-                        }
-                        break;
-                    }
-                }
-                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
-            } catch (Exception e) {
-                throw new GFacException("Cannot invoke OutHandlers\n" + e.getMessage(), e);
-            }
-        } catch (Exception e) {
-            throw new GFacException("Cannot invoke OutHandlers\n" + e.getMessage(), e);
-        }
-
-        // At this point all the execution is finished so we update the task and experiment statuses.
-        // Handler authors does not have to worry about updating experiment or task statuses.
-//        monitorPublisher.publish(new
-//                ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
-//                ExperimentState.COMPLETED));
-        // Updating the task status if there's any task associated
-        TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
-                jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-                jobExecutionContext.getExperimentID(),
-                jobExecutionContext.getGatewayID());
-        monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.COMPLETED, taskIdentity));
-        monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED));
-
-    }
-
-    /**
-     * If handlers ran successfully we re-run only recoverable handlers
-     * If handler never ran we run the normal invoke method
-     *
-     * @param jobExecutionContext
-     * @throws GFacException
-     */
-    // TODO - Did refactoring, but need to recheck the logic again.
-    private void reInvokeInFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
-        List<GFacHandlerConfig> handlers = jobExecutionContext.getGFacConfiguration().getInHandlers();
-        try {
-            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
-                    , GfacExperimentState.INHANDLERSINVOKING));
-            for (GFacHandlerConfig handlerClassName : handlers) {
-                Class<? extends GFacHandler> handlerClass;
-                GFacHandler handler;
-                try {
-                    handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
-                    handler = handlerClass.newInstance();
-                    GfacHandlerState plState = GFacUtils.getHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName());
-                    GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.INVOKING);
-                    handler.initProperties(handlerClassName.getProperties());
-                    if (plState == GfacHandlerState.UNKNOWN || plState == GfacHandlerState.INVOKING) {
-                        log.info(handlerClassName.getClassName() + " never ran so we run this is normal mode");
-                        handler.invoke(jobExecutionContext);
-                    } else {
-                        // if these already ran we re-run only recoverable handlers
-                        log.info(handlerClassName.getClassName() + " is a recoverable handler so we recover the handler");
-                        handler.recover(jobExecutionContext);
-                    }
-                    GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
-                } catch (GFacHandlerException e) {
-                    throw new GFacException("Error Executing a InFlow Handler", e.getCause());
-                } catch (ClassNotFoundException e) {
-                    throw new GFacException("Cannot load handler class " + handlerClassName, e);
-                } catch (InstantiationException e) {
-                    throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
-                } catch (IllegalAccessException e) {
-                    throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
-                }
-            }
-            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
-                    , GfacExperimentState.INHANDLERSINVOKED));
-        } catch (Exception e) {
-            try {
-                StringWriter errors = new StringWriter();
-                e.printStackTrace(new PrintWriter(errors));
-                GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
-            } catch (GFacException e1) {
-                log.error(e1.getLocalizedMessage());
-            }
-            throw new GFacException("Error while re-invoking output handlers", e);
-        }
-    }
-
-    // TODO - Did refactoring, but need to recheck the logic again.
-    @Override
-    public void reInvokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
-        if (!initialized) {
-            throw new GFacException("Initialize the Gfac instance before use it");
-        }
-        GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
-        List<GFacHandlerConfig> handlers = null;
-        if (gFacConfiguration != null) {
-            handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
-        } else {
-            try {
-                jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(),
-                        jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getGatewayID());
-            } catch (Exception e) {
-                log.error("Error constructing job execution context during outhandler invocation");
-                throw new GFacException(e);
-            }
-            launch(jobExecutionContext);
-        }
-        monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
-        for (GFacHandlerConfig handlerClassName : handlers) {
-            Class<? extends GFacHandler> handlerClass;
-            GFacHandler handler;
-            try {
-                handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
-                handler = handlerClass.newInstance();
-                GfacHandlerState plState = GFacUtils.getHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName());
-                GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.INVOKING);
-                if (plState == GfacHandlerState.UNKNOWN || plState == GfacHandlerState.INVOKING) {
-                    log.info(handlerClassName.getClassName() + " never ran so we run this in normal mode");
-                    handler.initProperties(handlerClassName.getProperties());
-                    handler.invoke(jobExecutionContext);
-                } else {
-                    // if these already ran we re-run only recoverable handlers
-                    handler.recover(jobExecutionContext);
-                }
-                GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
-            } catch (ClassNotFoundException e) {
-                try {
-                    StringWriter errors = new StringWriter();
-                    e.printStackTrace(new PrintWriter(errors));
-                    GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
-                } catch (GFacException e1) {
-                    log.error(e1.getLocalizedMessage());
-                }
-                log.error(e.getMessage());
-                throw new GFacException("Cannot load handler class " + handlerClassName, e);
-            } catch (InstantiationException e) {
-                try {
-                    StringWriter errors = new StringWriter();
-                    e.printStackTrace(new PrintWriter(errors));
-                    GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
-                } catch (GFacException e1) {
-                    log.error(e1.getLocalizedMessage());
-                }
-                log.error(e.getMessage());
-                throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
-            } catch (IllegalAccessException e) {
-                try {
-                    StringWriter errors = new StringWriter();
-                    e.printStackTrace(new PrintWriter(errors));
-                    GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
-                } catch (GFacException e1) {
-                    log.error(e1.getLocalizedMessage());
-                }
-                log.error(e.getMessage());
-                throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
-            } catch (Exception e) {
-                // TODO: Better error reporting.
-                try {
-                    StringWriter errors = new StringWriter();
-                    e.printStackTrace(new PrintWriter(errors));
-                    GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
-                } catch (GFacException e1) {
-                    log.error(e1.getLocalizedMessage());
-                }
-                throw new GFacException("Error Executing a OutFlow Handler", e);
-            }
-        }
-        monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
-
-        // At this point all the execution is finished so we update the task and experiment statuses.
-        // Handler authors does not have to worry about updating experiment or task statuses.
-//        monitorPublisher.publish(new
-//                ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
-//                ExperimentState.COMPLETED));
-        // Updating the task status if there's any task associated
-
-        TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
-                jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-                jobExecutionContext.getExperimentID(),
-                jobExecutionContext.getGatewayID());
-        monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.COMPLETED, taskIdentity));
-        monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED));
-    }
-
-    private boolean isCancelled(JobExecutionContext executionContext) {
-        // we should check whether experiment is cancelled using registry
-        try {
-            ExperimentStatus status = (ExperimentStatus) registry.get(RegistryModelType.EXPERIMENT_STATUS, executionContext.getExperimentID());
-            if (status != null) {
-                ExperimentState experimentState = status.getExperimentState();
-                if (experimentState != null) {
-                    if (experimentState == ExperimentState.CANCELED) {
-                        return true;
-                    }
-                }
-            }
-        } catch (RegistryException e) {
-            // on error we return false.
-        }
-        return false;
-    }
-
-    private boolean isCancelling(JobExecutionContext executionContext) {
-        // check whether cancelling request came
-        try {
-            ExperimentStatus status = (ExperimentStatus) registry.get(RegistryModelType.EXPERIMENT_STATUS, executionContext.getExperimentID());
-            if (status != null) {
-                ExperimentState experimentState = status.getExperimentState();
-                if (experimentState != null) {
-                    if (experimentState == ExperimentState.CANCELING) {
-                        return true;
-                    }
-                }
-            }
-        } catch (RegistryException e) {
-            // on error we return false;
-        }
-        return false;
-    }
-
-    private boolean isCancel(JobExecutionContext jobExecutionContext) {
-        try {
-            ExperimentStatus status = (ExperimentStatus) registry.get(RegistryModelType.EXPERIMENT_STATUS, jobExecutionContext.getExperimentID());
-            if (status != null) {
-                ExperimentState experimentState = status.getExperimentState();
-                if (experimentState != null) {
-                    if (experimentState == ExperimentState.CANCELING || experimentState == ExperimentState.CANCELED) {
-                        return true;
-                    }
-                }
-            }
-        } catch (RegistryException e) {
-            // on error we return false;
-        }
-        return false;
-    }
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java
deleted file mode 100644
index 962f0ec..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.core.cpi;
-
-import org.airavata.appcatalog.cpi.AppCatalog;
-import org.apache.airavata.common.utils.MonitorPublisher;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.core.context.JobExecutionContext;
-import org.apache.airavata.registry.cpi.Registry;
-import org.apache.curator.framework.CuratorFramework;
-
-/**
- * This is the GFac CPI interface which needs to be implemented by an internal class, this simply have a single method to submit a job to
- * the resource, required data for the job has to be stored in registry prior to invoke this object.
- */
-public interface GFac {
-
-    /**
-     * Initialized method, this method must call one time before use any other method.
-     * @param registry
-     * @param appCatalog
-     * @param curatorClient
-     * @param publisher
-     * @return
-     */
-    public boolean init(Registry registry, AppCatalog appCatalog, CuratorFramework curatorClient, MonitorPublisher publisher);
-
-    /**
-     * This is the job launching method outsiders of GFac can use, this will invoke the GFac handler chain and providers
-     * And update the registry occordingly, so the users can query the database to retrieve status and output from Registry
-     *
-     * @param experimentID
-     * @return boolean Successful acceptence of the jobExecution returns a true value
-     * @throws org.apache.airavata.gfac.GFacException
-     */
-    public boolean submitJob(String experimentID,String taskID, String gatewayID, String tokenId) throws GFacException;
-
-    /**
-     * This method can be used in a handler to ivvoke outhandler asynchronously
-     * @param jobExecutionContext
-     * @throws GFacException
-     */
-    public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException;
-
-    /**
-     * This method can be used to handle re-run case asynchronously
-     * @param jobExecutionContext
-     * @throws GFacException
-     */
-    public void reInvokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException;
-
-    /**
-     * This operation can be used to cancel an already running experiment
-     * @return Successful cancellation will return true
-     * @throws GFacException
-     */
-    public boolean cancel(String experimentID, String taskID, String gatewayID, String tokenId)throws GFacException;
-
-}