You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/06/03 20:14:41 UTC
[24/39] airavata git commit: Refactored gfac sub modules,
merged gfac-ssh, gfac-gsissh, gfac-local,
gfac-monitor and gsissh modules and create gface-impl,
removed implementation from gfac-core to gfac-impl
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
new file mode 100644
index 0000000..9f369b1
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
@@ -0,0 +1,346 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.gsissh.provider.impl;
+
+import org.airavata.appcatalog.cpi.AppCatalogException;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent;
+import org.apache.airavata.gfac.core.provider.AbstractProvider;
+import org.apache.airavata.gfac.core.provider.GFacProviderException;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
+import org.apache.airavata.gfac.gsissh.util.GFACGSISSHUtils;
+import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor;
+import org.apache.airavata.gfac.monitor.email.EmailMonitorFactory;
+import org.apache.airavata.gfac.ssh.api.Cluster;
+import org.apache.airavata.gfac.ssh.api.SSHApiException;
+import org.apache.airavata.gfac.ssh.api.job.JobDescriptor;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.MonitorMode;
+import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.model.workspace.experiment.JobDetails;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Map;
+
+//import org.apache.airavata.schemas.gfac.GsisshHostType;
+
+public class GSISSHProvider extends AbstractProvider {
+ private static final Logger log = LoggerFactory.getLogger(GSISSHProvider.class);
+
+ public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
+
+ }
+
+ public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+ super.initialize(jobExecutionContext);
+ try {
+ String hostAddress = jobExecutionContext.getHostName();
+ if (jobExecutionContext.getSecurityContext(hostAddress) == null) {
+ GFACGSISSHUtils.addSecurityContext(jobExecutionContext);
+ }
+ } catch (ApplicationSettingsException e) {
+ log.error(e.getMessage());
+ throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+ } catch (GFacException e) {
+ throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+ }
+ }
+
+ public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+ log.info("Invoking GSISSH Provider Invoke ...");
+ StringBuffer data = new StringBuffer();
+ jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
+ ComputeResourceDescription computeResourceDescription = jobExecutionContext.getApplicationContext()
+ .getComputeResourceDescription();
+ ApplicationDeploymentDescription appDeployDesc = jobExecutionContext.getApplicationContext()
+ .getApplicationDeploymentDescription();
+ JobDetails jobDetails = new JobDetails();
+ Cluster cluster = null;
+
+ try {
+ if (jobExecutionContext.getSecurityContext(jobExecutionContext.getHostName()) != null) {
+ cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(jobExecutionContext.getHostName())).getPbsCluster();
+ }
+ if (cluster == null) {
+ throw new GFacProviderException("Security context is not set properly");
+ } else {
+ log.info("Successfully retrieved the Security Context");
+ }
+ // This installed path is a mandetory field, because this could change based on the computing resource
+ JobDescriptor jobDescriptor = GFACGSISSHUtils.createJobDescriptor(jobExecutionContext, cluster);
+ jobDetails.setJobName(jobDescriptor.getJobName());
+
+ log.info(jobDescriptor.toXML());
+ data.append("jobDesc=").append(jobDescriptor.toXML());
+ jobDetails.setJobDescription(jobDescriptor.toXML());
+ String jobID = cluster.submitBatchJob(jobDescriptor);
+ jobExecutionContext.setJobDetails(jobDetails);
+ if (jobID == null) {
+ jobDetails.setJobID("none");
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
+ } else {
+ jobDetails.setJobID(jobID.split("\\.")[0]);
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED);
+ }
+ data.append(",jobId=").append(jobDetails.getJobID());
+
+ // Now job has submitted to the resource, its up to the Provider to parse the information to daemon handler
+ // to perform monitoring, daemon handlers can be accessed from anywhere
+ monitor(jobExecutionContext);
+ // we know this host is type GsiSSHHostType
+ } catch (Exception e) {
+ String error = "Error submitting the job to host " + computeResourceDescription.getHostName() + " message: " + e.getMessage();
+ log.error(error);
+ jobDetails.setJobID("none");
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
+ StringWriter errors = new StringWriter();
+ e.printStackTrace(new PrintWriter(errors));
+ GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ throw new GFacProviderException(error, e);
+ } finally {
+ log.info("Saving data for future recovery: ");
+ log.info(data.toString());
+ GFacUtils.saveHandlerData(jobExecutionContext, data, this.getClass().getName());
+ }
+
+ }
+
+ public void removeFromMonitorHandlers(JobExecutionContext jobExecutionContext, SSHJobSubmission sshJobSubmission, String jobID) throws GFacHandlerException {
+/* List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers();
+ if (daemonHandlers == null) {
+ daemonHandlers = BetterGfacImpl.getDaemonHandlers();
+ }
+ ThreadedHandler pullMonitorHandler = null;
+ ThreadedHandler pushMonitorHandler = null;
+ MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
+ for (ThreadedHandler threadedHandler : daemonHandlers) {
+ if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) {
+ pullMonitorHandler = threadedHandler;
+ if (monitorMode == null || monitorMode == MonitorMode.POLL_JOB_MANAGER) {
+ jobExecutionContext.setProperty("cancel","true");
+ pullMonitorHandler.invoke(jobExecutionContext);
+ } else {
+ log.error("Currently we only support Pull and Push monitoring and monitorMode should be PULL" +
+ " to handle by the GridPullMonitorHandler");
+ }
+ } else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) {
+ pushMonitorHandler = threadedHandler;
+ if ( monitorMode == null || monitorMode == MonitorMode.XSEDE_AMQP_SUBSCRIBE) {
+ pushMonitorHandler.invoke(jobExecutionContext);
+ } else {
+ log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" +
+ " to handle by the GridPushMonitorHandler");
+ }
+ }
+ // have to handle the GridPushMonitorHandler logic
+ }
+ if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) {
+ log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" +
+ ", execution is configured as asynchronous, so Outhandler will not be invoked");
+ }*/
+ }
+
+ public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException {
+ //To change body of implemented methods use File | Settings | File Templates.
+ log.info("canceling the job status in GSISSHProvider!!!!!");
+ JobDetails jobDetails = jobExecutionContext.getJobDetails();
+ String hostName = jobExecutionContext.getHostName();
+ try {
+ Cluster cluster = null;
+ if (jobExecutionContext.getSecurityContext(hostName) == null) {
+ GFACGSISSHUtils.addSecurityContext(jobExecutionContext);
+ }
+ cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(hostName)).getPbsCluster();
+ if (cluster == null) {
+ throw new GFacProviderException("Security context is not set properly");
+ } else {
+ log.info("Successfully retrieved the Security Context");
+ }
+ // This installed path is a mandetory field, because this could change based on the computing resource
+ if(jobDetails == null) {
+ log.error("There is not JobDetails so cancelations cannot perform !!!");
+ return false;
+ }
+ if (jobDetails.getJobID() != null) {
+ // if this operation success without any exceptions, we can assume cancel operation succeeded.
+ cluster.cancelJob(jobDetails.getJobID());
+ } else {
+ log.error("No Job Id is set, so cannot perform the cancel operation !!!");
+ return false;
+ }
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED);
+ return true;
+ // we know this host is type GsiSSHHostType
+ } catch (SSHApiException e) {
+ String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage();
+ log.error(error);
+ jobDetails.setJobID("none");
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
+ GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ throw new GFacProviderException(error, e);
+ } catch (Exception e) {
+ String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage();
+ log.error(error);
+ jobDetails.setJobID("none");
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
+ GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ throw new GFacProviderException(error, e);
+ }
+ }
+
+ public void recover(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException {
+ // have to implement the logic to recover a gfac failure
+ log.info("Invoking Recovering for the Experiment: " + jobExecutionContext.getExperimentID());
+ ComputeResourceDescription computeResourceDescription = jobExecutionContext.getApplicationContext()
+ .getComputeResourceDescription();
+ String hostName = jobExecutionContext.getHostName();
+ String jobId = "";
+ String jobDesc = "";
+ try {
+ String pluginData = GFacUtils.getHandlerData(jobExecutionContext, this.getClass().getName());
+ String[] split = pluginData.split(",");
+ if (split.length < 2) {
+ try {
+ this.execute(jobExecutionContext);
+ } catch (GFacException e) {
+ log.error("Error while recovering provider", e);
+ throw new GFacProviderException("Error recovering provider", e);
+ }
+ return;
+ }
+ jobDesc = split[0].substring(7);
+ jobId = split[1].substring(6);
+
+ log.info("Following data have recovered: ");
+ log.info("Job Description: " + jobDesc);
+ log.info("Job Id: " + jobId);
+ if (jobId == null || "none".equals(jobId) ||
+ "".equals(jobId)) {
+ try {
+ this.execute(jobExecutionContext);
+ } catch (GFacException e) {
+ log.error("Error while recovering provider", e);
+ throw new GFacProviderException("Error recovering provider", e);
+ }
+ return;
+ }
+ } catch (Exception e) {
+ log.error("Error while recovering provider", e);
+ }
+ try {
+ // Now we are we have enough data to recover
+ JobDetails jobDetails = new JobDetails();
+ jobDetails.setJobDescription(jobDesc);
+ jobDetails.setJobID(jobId);
+ jobExecutionContext.setJobDetails(jobDetails);
+ if (jobExecutionContext.getSecurityContext(hostName) == null) {
+ try {
+ GFACGSISSHUtils.addSecurityContext(jobExecutionContext);
+ } catch (ApplicationSettingsException e) {
+ log.error(e.getMessage());
+ throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+ }
+ }
+ monitor(jobExecutionContext);
+ } catch (Exception e) {
+ log.error("Error while recover the job", e);
+ throw new GFacProviderException("Error delegating already ran job to Monitoring", e);
+ }
+ }
+
+ @Override
+ public void monitor(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+ String jobSubmissionInterfaceId = jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId();
+ SSHJobSubmission sshJobSubmission = null;
+ try {
+ sshJobSubmission = jobExecutionContext.getAppCatalog().getComputeResource().getSSHJobSubmission(jobSubmissionInterfaceId);
+ } catch (AppCatalogException e) {
+ throw new GFacException("Error while reading compute resource", e);
+ }
+ if (jobExecutionContext.getPreferredJobSubmissionProtocol() == JobSubmissionProtocol.SSH) {
+ MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
+ if (monitorMode != null && monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR) {
+ try {
+ EmailBasedMonitor emailBasedMonitor = EmailMonitorFactory.getEmailBasedMonitor(
+ sshJobSubmission.getResourceJobManager().getResourceJobManagerType());
+ emailBasedMonitor.addToJobMonitorMap(jobExecutionContext);
+ } catch (AiravataException e) {
+ throw new GFacHandlerException("Error while activating email job monitoring ", e);
+ }
+ return;
+ }
+ }
+/*
+ // if email monitor is not activeated or not configure we use pull or push monitor
+ List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers();
+ if (daemonHandlers == null) {
+ daemonHandlers = BetterGfacImpl.getDaemonHandlers();
+ }
+ ThreadedHandler pullMonitorHandler = null;
+ ThreadedHandler pushMonitorHandler = null;
+ MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
+ String jobID = jobExecutionContext.getJobDetails().getJobID();
+ for (ThreadedHandler threadedHandler : daemonHandlers) {
+ if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) {
+ pullMonitorHandler = threadedHandler;
+ if (monitorMode == null || monitorMode == MonitorMode.POLL_JOB_MANAGER) {
+ log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned: " + jobID);
+ pullMonitorHandler.invoke(jobExecutionContext);
+ } else {
+ log.error("Currently we only support Pull and Push monitoring and monitorMode should be PULL" +
+ " to handle by the GridPullMonitorHandler");
+ }
+ } else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) {
+ pushMonitorHandler = threadedHandler;
+ if (monitorMode == null || monitorMode == MonitorMode.XSEDE_AMQP_SUBSCRIBE) {
+ log.info("Job is launched successfully now parsing it to monitoring in push mode, JobID Returned: " + jobID);
+ pushMonitorHandler.invoke(jobExecutionContext);
+ } else {
+ log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" +
+ " to handle by the GridPushMonitorHandler");
+ }
+ }
+ // have to handle the GridPushMonitorHandler logic
+ }
+ if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) {
+ log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" +
+ ", execution is configured as asynchronous, so Outhandler will not be invoked");
+
+ }*/
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/GSISecurityContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/GSISecurityContext.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/GSISecurityContext.java
new file mode 100644
index 0000000..85e9e29
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/GSISecurityContext.java
@@ -0,0 +1,74 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.gsissh.security;
+
+import org.apache.airavata.credential.store.store.CredentialReader;
+import org.apache.airavata.gfac.AbstractSecurityContext;
+import org.apache.airavata.gfac.RequestData;
+import org.apache.airavata.gfac.ssh.api.Cluster;
+import org.globus.gsi.X509Credential;
+import org.globus.gsi.gssapi.GlobusGSSCredentialImpl;
+import org.globus.gsi.provider.GlobusProvider;
+import org.globus.myproxy.GetParams;
+import org.globus.myproxy.MyProxy;
+import org.globus.myproxy.MyProxyException;
+import org.gridforum.jgss.ExtendedGSSCredential;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles GRID related security.
+ */
+public class GSISecurityContext extends AbstractSecurityContext {
+
+ protected static final Logger log = LoggerFactory.getLogger(GSISecurityContext.class);
+ /*
+ * context name
+ */
+
+ private Cluster pbsCluster = null;
+
+
+ public GSISecurityContext(CredentialReader credentialReader, RequestData requestData, Cluster pbsCluster) {
+ super(credentialReader, requestData);
+ this.pbsCluster = pbsCluster;
+ }
+
+
+ public GSISecurityContext(CredentialReader credentialReader, RequestData requestData) {
+ super(credentialReader, requestData);
+ }
+
+
+ public GSISecurityContext(Cluster pbsCluster) {
+ this.setPbsCluster(pbsCluster);
+ }
+
+
+
+ public Cluster getPbsCluster() {
+ return pbsCluster;
+ }
+
+ public void setPbsCluster(Cluster pbsCluster) {
+ this.pbsCluster = pbsCluster;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/TokenizedMyProxyAuthInfo.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/TokenizedMyProxyAuthInfo.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/TokenizedMyProxyAuthInfo.java
new file mode 100644
index 0000000..a3e0241
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/TokenizedMyProxyAuthInfo.java
@@ -0,0 +1,304 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.gsissh.security;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.credential.store.credential.Credential;
+import org.apache.airavata.credential.store.credential.impl.certificate.CertificateCredential;
+import org.apache.airavata.credential.store.store.CredentialReader;
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.RequestData;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.core.authentication.GSIAuthenticationInfo;
+import org.globus.gsi.X509Credential;
+import org.globus.gsi.gssapi.GlobusGSSCredentialImpl;
+import org.globus.gsi.provider.GlobusProvider;
+import org.globus.myproxy.GetParams;
+import org.globus.myproxy.MyProxy;
+import org.globus.myproxy.MyProxyException;
+import org.gridforum.jgss.ExtendedGSSCredential;
+import org.ietf.jgss.GSSCredential;
+import org.ietf.jgss.GSSException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.security.Security;
+import java.security.cert.X509Certificate;
+
+public class TokenizedMyProxyAuthInfo extends GSIAuthenticationInfo {
+ protected static final Logger log = LoggerFactory.getLogger(TokenizedMyProxyAuthInfo.class);
+
+ public static int CREDENTIAL_RENEWING_THRESH_HOLD = 10 * 90;
+
+ private GSSCredential gssCredentials = null;
+
+
+ private CredentialReader credentialReader;
+
+ private RequestData requestData;
+
+ public static final String X509_CERT_DIR = "X509_CERT_DIR";
+
+
+ static {
+ Security.addProvider(new GlobusProvider());
+ try {
+ setUpTrustedCertificatePath();
+ } catch (ApplicationSettingsException e) {
+ log.error(e.getLocalizedMessage(), e);
+ }
+ }
+
+ public static void setUpTrustedCertificatePath(String trustedCertificatePath) {
+
+ File file = new File(trustedCertificatePath);
+
+ if (!file.exists() || !file.canRead()) {
+ File f = new File(".");
+ log.info("Current directory " + f.getAbsolutePath());
+ throw new RuntimeException("Cannot read trusted certificate path " + trustedCertificatePath);
+ } else {
+ System.setProperty(Constants.TRUSTED_CERTIFICATE_SYSTEM_PROPERTY, file.getAbsolutePath());
+ }
+ }
+
+ private static void setUpTrustedCertificatePath() throws ApplicationSettingsException {
+
+ String trustedCertificatePath = ServerSettings.getSetting(Constants.TRUSTED_CERT_LOCATION);
+
+ setUpTrustedCertificatePath(trustedCertificatePath);
+ }
+
+ public TokenizedMyProxyAuthInfo(CredentialReader credentialReader, RequestData requestData) {
+ this.credentialReader = credentialReader;
+ this.requestData = requestData;
+ try {
+ properties.setProperty(X509_CERT_DIR, ServerSettings.getSetting(Constants.TRUSTED_CERT_LOCATION));
+ } catch (ApplicationSettingsException e) {
+ log.error("Error while reading server properties", e);
+ };
+ }
+
+ public TokenizedMyProxyAuthInfo(RequestData requestData) {
+ this.requestData = requestData;
+ try {
+ properties.setProperty(X509_CERT_DIR, ServerSettings.getSetting(Constants.TRUSTED_CERT_LOCATION));
+ } catch (ApplicationSettingsException e) {
+ log.error("Error while reading server properties", e);
+ };
+ }
+
+ public GSSCredential getCredentials() throws SecurityException {
+
+ if (gssCredentials == null) {
+
+ try {
+ gssCredentials = getCredentialsFromStore();
+ } catch (Exception e) {
+ log.error("An exception occurred while retrieving credentials from the credential store. " +
+ "Will continue with my proxy user name and password. Provided TokenId:" + requestData.getTokenId(), e);
+ }
+
+ if (gssCredentials == null) {
+ System.out.println("Authenticating with provided token failed, so falling back to authenticate with defaultCredentials");
+ try {
+ gssCredentials = getDefaultCredentials();
+ } catch (Exception e) {
+ throw new SecurityException("Error retrieving my proxy using username password");
+ }
+ }
+ // if still null, throw an exception
+ if (gssCredentials == null) {
+ throw new SecurityException("Unable to retrieve my proxy credentials to continue operation.");
+ }
+ } else {
+ try {
+ if (gssCredentials.getRemainingLifetime() < CREDENTIAL_RENEWING_THRESH_HOLD) {
+ try {
+ return renewCredentials();
+ } catch (Exception e) {
+ throw new SecurityException("Error renewing credentials", e);
+ }
+ }
+ } catch (GSSException e) {
+ throw new SecurityException("Unable to retrieve remaining life time from credentials.", e);
+ }
+ }
+
+ return gssCredentials;
+ }
+
+
+ /**
+ * Reads the credentials from credential store.
+ *
+ * @return If token is found in the credential store, will return a valid credential. Else returns null.
+ * @throws Exception If an error occurred while retrieving credentials.
+ */
+ public GSSCredential getCredentialsFromStore() throws Exception {
+
+ if (getCredentialReader() == null) {
+ credentialReader = GFacUtils.getCredentialReader();
+ if(credentialReader == null){
+ return null;
+ }
+ }
+
+ Credential credential = getCredentialReader().getCredential(getRequestData().getGatewayId(),
+ getRequestData().getTokenId());
+
+ if (credential != null) {
+ if (credential instanceof CertificateCredential) {
+
+ log.info("Successfully found credentials for token id - " + getRequestData().getTokenId() +
+ " gateway id - " + getRequestData().getGatewayId());
+
+ CertificateCredential certificateCredential = (CertificateCredential) credential;
+
+ X509Certificate[] certificates = certificateCredential.getCertificates();
+ X509Credential newCredential = new X509Credential(certificateCredential.getPrivateKey(), certificates);
+
+ GlobusGSSCredentialImpl cred = new GlobusGSSCredentialImpl(newCredential, GSSCredential.INITIATE_AND_ACCEPT);
+ System.out.print(cred.export(ExtendedGSSCredential.IMPEXP_OPAQUE));
+ return cred;
+ //return new GlobusGSSCredentialImpl(newCredential,
+ // GSSCredential.INITIATE_AND_ACCEPT);
+ } else {
+ log.info("Credential type is not CertificateCredential. Cannot create mapping globus credentials. " +
+ "Credential type - " + credential.getClass().getName());
+ }
+ } else {
+ log.info("Could not find credentials for token - " + getRequestData().getTokenId() + " and "
+ + "gateway id - " + getRequestData().getGatewayId());
+ }
+
+ return null;
+ }
+
+ /**
+ * Renew GSSCredentials.
+ * Before executing we need to add current host as a trusted renewer. Note to renew credentials
+ * we dont need user name and password.
+ * To do that execute following command
+ * > myproxy-logon -t <LIFETIME></LIFETIME> -s <MY PROXY SERVER> -l <USER NAME>
+ * E.g :- > myproxy-logon -t 264 -s myproxy.teragrid.org -l us3
+ * Enter MyProxy pass phrase:
+ * A credential has been received for user us3 in /tmp/x509up_u501.
+ * > myproxy-init -A --cert /tmp/x509up_u501 --key /tmp/x509up_u501 -l ogce -s myproxy.teragrid.org
+ *
+ * @return Renewed credentials.
+ * @throws org.apache.airavata.gfac.GFacException If an error occurred while renewing credentials.
+ * @throws org.apache.airavata.common.exception.ApplicationSettingsException
+ */
+ public GSSCredential renewCredentialsAsATrustedHost() throws GFacException, ApplicationSettingsException {
+ MyProxy myproxy = new MyProxy(getRequestData().getMyProxyServerUrl(), getRequestData().getMyProxyPort());
+ GetParams getParams = new GetParams();
+ getParams.setAuthzCreds(gssCredentials);
+ getParams.setUserName(getRequestData().getMyProxyUserName());
+ getParams.setLifetime(getRequestData().getMyProxyLifeTime());
+ try {
+ return myproxy.get(gssCredentials, getParams);
+ } catch (MyProxyException e) {
+ throw new GFacException("An error occurred while renewing security credentials.", e);
+ }
+ }
+
+
+ /**
+ * Gets the default proxy certificate.
+ *
+ * @return Default my proxy credentials.
+ * @throws org.apache.airavata.gfac.GFacException If an error occurred while retrieving credentials.
+ * @throws org.apache.airavata.common.exception.ApplicationSettingsException
+ */
+ public GSSCredential getDefaultCredentials() throws GFacException, ApplicationSettingsException {
+ MyProxy myproxy = new MyProxy(getRequestData().getMyProxyServerUrl(), getRequestData().getMyProxyPort());
+ try {
+ return myproxy.get(getRequestData().getMyProxyUserName(), getRequestData().getMyProxyPassword(),
+ getRequestData().getMyProxyLifeTime());
+ } catch (MyProxyException e) {
+ throw new GFacException("An error occurred while retrieving default security credentials.", e);
+ }
+ }
+
+
+ /**
+ * Renews credentials. First try to renew credentials as a trusted renewer. If that failed
+ * use user name and password to renew credentials.
+ *
+ * @return Renewed credentials.
+ * @throws org.apache.airavata.gfac.GFacException If an error occurred while renewing credentials.
+ * @throws org.apache.airavata.common.exception.ApplicationSettingsException
+ */
+ public GSSCredential renewCredentials() throws GFacException, ApplicationSettingsException {
+
+ // First try to renew credentials as a trusted renewer
+ try {
+ gssCredentials = renewCredentialsAsATrustedHost();
+ } catch (Exception e) {
+ log.warn("Renewing credentials as a trusted renewer failed", e);
+ gssCredentials = getDefaultCredentials();
+ }
+
+ return gssCredentials;
+ }
+
+ /**
+ * Gets a new proxy certificate given current credentials.
+ *
+ * @return The short lived GSSCredentials
+ * @throws org.apache.airavata.gfac.GFacException If an error is occurred while retrieving credentials.
+ * @throws org.apache.airavata.common.exception.ApplicationSettingsException
+ */
+ public GSSCredential getProxyCredentials() throws GFacException, ApplicationSettingsException {
+
+ MyProxy myproxy = new MyProxy(getRequestData().getMyProxyServerUrl(), getRequestData().getMyProxyPort());
+ try {
+ return myproxy.get(gssCredentials, getRequestData().getMyProxyUserName(), getRequestData().getMyProxyPassword(),
+ getRequestData().getMyProxyLifeTime());
+ } catch (MyProxyException e) {
+ throw new GFacException("An error occurred while renewing security credentials using user/password.", e);
+ }
+ }
+
+ public void setGssCredentials(GSSCredential gssCredentials) {
+ this.gssCredentials = gssCredentials;
+ }
+
+ public CredentialReader getCredentialReader() {
+ return credentialReader;
+ }
+
+ public void setCredentialReader(CredentialReader credentialReader) {
+ this.credentialReader = credentialReader;
+ }
+
+ public RequestData getRequestData() {
+ return requestData;
+ }
+
+ public void setRequestData(RequestData requestData) {
+ this.requestData = requestData;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
new file mode 100644
index 0000000..c3978b1
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
@@ -0,0 +1,367 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.gsissh.util;
+
+import org.airavata.appcatalog.cpi.AppCatalog;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.credential.store.credential.impl.certificate.CertificateCredential;
+import org.apache.airavata.credential.store.store.CredentialReader;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.RequestData;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.context.MessageContext;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
+import org.apache.airavata.gfac.gsissh.security.TokenizedMyProxyAuthInfo;
+import org.apache.airavata.gfac.ssh.api.Cluster;
+import org.apache.airavata.gfac.ssh.api.ServerInfo;
+import org.apache.airavata.gfac.ssh.api.job.JobDescriptor;
+import org.apache.airavata.gfac.ssh.api.job.JobManagerConfiguration;
+import org.apache.airavata.gfac.ssh.impl.GSISSHAbstractCluster;
+import org.apache.airavata.gfac.ssh.impl.PBSCluster;
+import org.apache.airavata.gfac.ssh.util.CommonUtils;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationParallelismType;
+import org.apache.airavata.model.appcatalog.appinterface.DataType;
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.appcatalog.computeresource.*;
+import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
+import org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.*;
+
+
+public class GFACGSISSHUtils {
+ private final static Logger logger = LoggerFactory.getLogger(GFACGSISSHUtils.class);
+
+ public static final String PBS_JOB_MANAGER = "pbs";
+ public static final String SLURM_JOB_MANAGER = "slurm";
+ public static final String SUN_GRID_ENGINE_JOB_MANAGER = "UGE";
+ public static final String LSF_JOB_MANAGER = "lsf";
+
+ public static int maxClusterCount = 5;
+ public static Map<String, List<Cluster>> clusters = new HashMap<String, List<Cluster>>();
+
+ public static void addSecurityContext(JobExecutionContext jobExecutionContext) throws GFacException, ApplicationSettingsException {
+ JobSubmissionInterface jobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface();
+ JobSubmissionProtocol jobProtocol = jobSubmissionInterface.getJobSubmissionProtocol();
+ try {
+ AppCatalog appCatalog = jobExecutionContext.getAppCatalog();
+ SSHJobSubmission sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
+ if (jobProtocol == JobSubmissionProtocol.GLOBUS || jobProtocol == JobSubmissionProtocol.UNICORE
+ || jobProtocol == JobSubmissionProtocol.CLOUD || jobProtocol == JobSubmissionProtocol.LOCAL) {
+ logger.error("This is a wrong method to invoke to non ssh host types,please check your gfac-config.xml");
+ } else if (jobProtocol == JobSubmissionProtocol.SSH && sshJobSubmission.getSecurityProtocol() == SecurityProtocol.GSI) {
+ String credentialStoreToken = jobExecutionContext.getCredentialStoreToken(); // this is set by the framework
+ RequestData requestData = new RequestData(jobExecutionContext.getGatewayID());
+ requestData.setTokenId(credentialStoreToken);
+ PBSCluster pbsCluster = null;
+ GSISecurityContext context = null;
+
+ TokenizedMyProxyAuthInfo tokenizedMyProxyAuthInfo = new TokenizedMyProxyAuthInfo(requestData);
+ CredentialReader credentialReader = GFacUtils.getCredentialReader();
+ if (credentialReader != null) {
+ CertificateCredential credential = null;
+ try {
+ credential = (CertificateCredential) credentialReader.getCredential(jobExecutionContext.getGatewayID(), credentialStoreToken);
+ requestData.setMyProxyUserName(credential.getCommunityUser().getUserName());
+ } catch (Exception e) {
+ logger.error(e.getLocalizedMessage());
+ }
+ }
+
+ String key = requestData.getMyProxyUserName() + jobExecutionContext.getHostName()+
+ sshJobSubmission.getSshPort();
+ boolean recreate = false;
+ synchronized (clusters) {
+ if (clusters.containsKey(key) && clusters.get(key).size() < maxClusterCount) {
+ recreate = true;
+ } else if (clusters.containsKey(key)) {
+ int i = new Random().nextInt(Integer.MAX_VALUE) % maxClusterCount;
+ if (clusters.get(key).get(i).getSession().isConnected()) {
+ pbsCluster = (PBSCluster) clusters.get(key).get(i);
+ } else {
+ clusters.get(key).remove(i);
+ recreate = true;
+ }
+ if (!recreate) {
+ try {
+ pbsCluster.listDirectory("~/"); // its hard to trust isConnected method, so we try to connect if it works we are good,else we recreate
+ } catch (Exception e) {
+ clusters.get(key).remove(i);
+ logger.info("Connection found the connection map is expired, so we create from the scratch");
+ maxClusterCount++;
+ recreate = true; // we make the pbsCluster to create again if there is any exception druing connection
+ }
+ logger.info("Re-using the same connection used with the connection string:" + key);
+ context = new GSISecurityContext(tokenizedMyProxyAuthInfo.getCredentialReader(), requestData, pbsCluster);
+ }
+ } else {
+ recreate = true;
+ }
+
+ if (recreate) {
+ ServerInfo serverInfo = new ServerInfo(requestData.getMyProxyUserName(), jobExecutionContext.getHostName(),
+ sshJobSubmission.getSshPort());
+
+ JobManagerConfiguration jConfig = null;
+ String installedParentPath = sshJobSubmission.getResourceJobManager().getJobManagerBinPath();
+ String jobManager = sshJobSubmission.getResourceJobManager().getResourceJobManagerType().toString();
+ if (jobManager == null) {
+ logger.error("No Job Manager is configured, so we are picking pbs as the default job manager");
+ jConfig = CommonUtils.getPBSJobManager(installedParentPath);
+ } else {
+ if (PBS_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+ jConfig = CommonUtils.getPBSJobManager(installedParentPath);
+ } else if (SLURM_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+ jConfig = CommonUtils.getSLURMJobManager(installedParentPath);
+ } else if (SUN_GRID_ENGINE_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+ jConfig = CommonUtils.getUGEJobManager(installedParentPath);
+ }else if(LSF_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+ jConfig = CommonUtils.getLSFJobManager(installedParentPath);
+ }
+ }
+ pbsCluster = new PBSCluster(serverInfo, tokenizedMyProxyAuthInfo, jConfig);
+ context = new GSISecurityContext(tokenizedMyProxyAuthInfo.getCredentialReader(), requestData, pbsCluster);
+ List<Cluster> pbsClusters = null;
+ if (!(clusters.containsKey(key))) {
+ pbsClusters = new ArrayList<Cluster>();
+ } else {
+ pbsClusters = clusters.get(key);
+ }
+ pbsClusters.add(pbsCluster);
+ clusters.put(key, pbsClusters);
+ }
+ }
+
+ jobExecutionContext.addSecurityContext(jobExecutionContext.getHostName(), context);
+ }
+ } catch (Exception e) {
+ throw new GFacException("An error occurred while creating GSI security context", e);
+ }
+ }
+
+ public static JobDescriptor createJobDescriptor(JobExecutionContext jobExecutionContext, Cluster cluster) {
+ JobDescriptor jobDescriptor = new JobDescriptor();
+ TaskDetails taskData = jobExecutionContext.getTaskData();
+ ResourceJobManager resourceJobManager = jobExecutionContext.getResourceJobManager();
+ try {
+ if(ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_ENABLE).equalsIgnoreCase("true")){
+ jobDescriptor.setMailOptions(ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_FLAGS));
+ String emailids = ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_EMAILIDS);
+
+ if(jobExecutionContext.getTaskData().isSetEmailAddresses()){
+ List<String> emailList = jobExecutionContext.getTaskData().getEmailAddresses();
+ String elist = GFacUtils.listToCsv(emailList, ',');
+ if(emailids != null && !emailids.isEmpty()){
+ emailids = emailids +"," + elist;
+ }else{
+ emailids = elist;
+ }
+ }
+ if(emailids != null && !emailids.isEmpty()){
+ logger.info("Email list: "+ emailids);
+ jobDescriptor.setMailAddress(emailids);
+ }
+ }
+ } catch (ApplicationSettingsException e) {
+ logger.error("ApplicationSettingsException : " +e.getLocalizedMessage());
+ }
+ // this is common for any application descriptor
+ jobDescriptor.setCallBackIp(ServerSettings.getIp());
+ jobDescriptor.setCallBackPort(ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.GFAC_SERVER_PORT, "8950"));
+ jobDescriptor.setInputDirectory(jobExecutionContext.getInputDir());
+ jobDescriptor.setOutputDirectory(jobExecutionContext.getOutputDir());
+ jobDescriptor.setExecutablePath(jobExecutionContext.getExecutablePath());
+ jobDescriptor.setStandardOutFile(jobExecutionContext.getStandardOutput());
+ jobDescriptor.setStandardErrorFile(jobExecutionContext.getStandardError());
+ String computationalProjectAccount = taskData.getTaskScheduling().getComputationalProjectAccount();
+ taskData.getEmailAddresses();
+ if (computationalProjectAccount == null){
+ ComputeResourcePreference computeResourcePreference = jobExecutionContext.getApplicationContext().getComputeResourcePreference();
+ if (computeResourcePreference != null) {
+ computationalProjectAccount = computeResourcePreference.getAllocationProjectNumber();
+ }
+ }
+ if (computationalProjectAccount != null) {
+ jobDescriptor.setAcountString(computationalProjectAccount);
+ }
+
+ Random random = new Random();
+ int i = random.nextInt(Integer.MAX_VALUE); // We always set the job name
+ jobDescriptor.setJobName("A" + String.valueOf(i+99999999));
+ jobDescriptor.setWorkingDirectory(jobExecutionContext.getWorkingDir());
+
+ List<String> inputValues = new ArrayList<String>();
+ MessageContext input = jobExecutionContext.getInMessageContext();
+ // sort the inputs first and then build the command List
+ Comparator<InputDataObjectType> inputOrderComparator = new Comparator<InputDataObjectType>() {
+ @Override
+ public int compare(InputDataObjectType inputDataObjectType, InputDataObjectType t1) {
+ return inputDataObjectType.getInputOrder() - t1.getInputOrder();
+ }
+ };
+ Set<InputDataObjectType> sortedInputSet = new TreeSet<InputDataObjectType>(inputOrderComparator);
+ for (Object object : input.getParameters().values()) {
+ if (object instanceof InputDataObjectType) {
+ InputDataObjectType inputDOT = (InputDataObjectType) object;
+ sortedInputSet.add(inputDOT);
+ }
+ }
+ for (InputDataObjectType inputDataObjectType : sortedInputSet) {
+ if (!inputDataObjectType.isRequiredToAddedToCommandLine()) {
+ continue;
+ }
+ if (inputDataObjectType.getApplicationArgument() != null
+ && !inputDataObjectType.getApplicationArgument().equals("")) {
+ inputValues.add(inputDataObjectType.getApplicationArgument());
+ }
+
+ if (inputDataObjectType.getValue() != null
+ && !inputDataObjectType.getValue().equals("")) {
+ if (inputDataObjectType.getType() == DataType.URI) {
+ // set only the relative path
+ String filePath = inputDataObjectType.getValue();
+ filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length());
+ inputValues.add(filePath);
+ }else {
+ inputValues.add(inputDataObjectType.getValue());
+ }
+
+ }
+ }
+
+ Map<String, Object> outputParams = jobExecutionContext.getOutMessageContext().getParameters();
+ for (Object outputParam : outputParams.values()) {
+ if (outputParam instanceof OutputDataObjectType) {
+ OutputDataObjectType output = (OutputDataObjectType) outputParam;
+ if (output.getApplicationArgument() != null
+ && !output.getApplicationArgument().equals("")) {
+ inputValues.add(output.getApplicationArgument());
+ }
+ if (output.getValue() != null && !output.getValue().equals("") && output.isRequiredToAddedToCommandLine()) {
+ if (output.getType() == DataType.URI){
+ String filePath = output.getValue();
+ filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length());
+ inputValues.add(filePath);
+ }
+ }
+ }
+ }
+ jobDescriptor.setInputValues(inputValues);
+
+ jobDescriptor.setUserName(((GSISSHAbstractCluster) cluster).getServerInfo().getUserName());
+ jobDescriptor.setShellName("/bin/bash");
+ jobDescriptor.setAllEnvExport(true);
+ jobDescriptor.setOwner(((PBSCluster) cluster).getServerInfo().getUserName());
+
+ ComputationalResourceScheduling taskScheduling = taskData.getTaskScheduling();
+ if (taskScheduling != null) {
+ int totalNodeCount = taskScheduling.getNodeCount();
+ int totalCPUCount = taskScheduling.getTotalCPUCount();
+
+// jobDescriptor.setJobSubmitter(applicationDeploymentType.getJobSubmitterCommand());
+ if (taskScheduling.getComputationalProjectAccount() != null) {
+ jobDescriptor.setAcountString(taskScheduling.getComputationalProjectAccount());
+ }
+ if (taskScheduling.getQueueName() != null) {
+ jobDescriptor.setQueueName(taskScheduling.getQueueName());
+ }
+
+ if (totalNodeCount > 0) {
+ jobDescriptor.setNodes(totalNodeCount);
+ }
+ if (taskScheduling.getComputationalProjectAccount() != null) {
+ jobDescriptor.setAcountString(taskScheduling.getComputationalProjectAccount());
+ }
+ if (taskScheduling.getQueueName() != null) {
+ jobDescriptor.setQueueName(taskScheduling.getQueueName());
+ }
+ if (totalCPUCount > 0) {
+ int ppn = totalCPUCount / totalNodeCount;
+ jobDescriptor.setProcessesPerNode(ppn);
+ jobDescriptor.setCPUCount(totalCPUCount);
+ }
+ if (taskScheduling.getWallTimeLimit() > 0) {
+ jobDescriptor.setMaxWallTime(String.valueOf(taskScheduling.getWallTimeLimit()));
+ if(resourceJobManager.getResourceJobManagerType().equals(ResourceJobManagerType.LSF)){
+ jobDescriptor.setMaxWallTimeForLSF(String.valueOf(taskScheduling.getWallTimeLimit()));
+ }
+ }
+
+ if (taskScheduling.getTotalPhysicalMemory() > 0) {
+ jobDescriptor.setUsedMemory(taskScheduling.getTotalPhysicalMemory() + "");
+ }
+ } else {
+ logger.error("Task scheduling cannot be null at this point..");
+ }
+
+ ApplicationDeploymentDescription appDepDescription = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription();
+ List<String> moduleCmds = appDepDescription.getModuleLoadCmds();
+ if (moduleCmds != null) {
+ for (String moduleCmd : moduleCmds) {
+ jobDescriptor.addModuleLoadCommands(moduleCmd);
+ }
+ }
+ List<String> preJobCommands = appDepDescription.getPreJobCommands();
+ if (preJobCommands != null) {
+ for (String preJobCommand : preJobCommands) {
+ jobDescriptor.addPreJobCommand(parseCommand(preJobCommand, jobExecutionContext));
+ }
+ }
+
+ List<String> postJobCommands = appDepDescription.getPostJobCommands();
+ if (postJobCommands != null) {
+ for (String postJobCommand : postJobCommands) {
+ jobDescriptor.addPostJobCommand(parseCommand(postJobCommand, jobExecutionContext));
+ }
+ }
+
+ ApplicationParallelismType parallelism = appDepDescription.getParallelism();
+ if (parallelism != null){
+ if (parallelism == ApplicationParallelismType.MPI || parallelism == ApplicationParallelismType.OPENMP || parallelism == ApplicationParallelismType.OPENMP_MPI){
+ Map<JobManagerCommand, String> jobManagerCommands = resourceJobManager.getJobManagerCommands();
+ if (jobManagerCommands != null && !jobManagerCommands.isEmpty()) {
+ for (JobManagerCommand command : jobManagerCommands.keySet()) {
+ if (command == JobManagerCommand.SUBMISSION) {
+ String commandVal = jobManagerCommands.get(command);
+ jobDescriptor.setJobSubmitter(commandVal);
+ }
+ }
+ }
+ }
+ }
+ return jobDescriptor;
+ }
+
+ private static String parseCommand(String value, JobExecutionContext jobExecutionContext) {
+ String parsedValue = value.replaceAll("\\$workingDir", jobExecutionContext.getWorkingDir());
+ parsedValue = parsedValue.replaceAll("\\$inputDir", jobExecutionContext.getInputDir());
+ parsedValue = parsedValue.replaceAll("\\$outputDir", jobExecutionContext.getOutputDir());
+ return parsedValue;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataJobStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataJobStatusUpdator.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataJobStatusUpdator.java
new file mode 100644
index 0000000..e7c6572
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataJobStatusUpdator.java
@@ -0,0 +1,120 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.impl;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.listener.AbstractActivityListener;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.workspace.experiment.JobDetails;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.registry.cpi.CompositeIdentifier;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Calendar;
+
+public class AiravataJobStatusUpdator implements AbstractActivityListener {
+ private final static Logger logger = LoggerFactory.getLogger(AiravataJobStatusUpdator.class);
+ private Registry airavataRegistry;
+
+ private MonitorPublisher monitorPublisher;
+ private Publisher publisher;
+
+
+ public Registry getAiravataRegistry() {
+ return airavataRegistry;
+ }
+
+ public void setAiravataRegistry(Registry airavataRegistry) {
+ this.airavataRegistry = airavataRegistry;
+ }
+
+
+ @Subscribe
+ public void updateRegistry(JobStatusChangeRequestEvent jobStatus) throws Exception{
+ /* Here we need to parse the jobStatus message and update
+ the registry accordingly, for now we are just printing to standard Out
+ */
+ JobState state = jobStatus.getState();
+ if (state != null) {
+ try {
+ String taskID = jobStatus.getJobIdentity().getTaskId();
+ String jobID = jobStatus.getJobIdentity().getJobId();
+ String expId = jobStatus.getJobIdentity().getExperimentId();
+ updateJobStatus(expId,taskID, jobID, state);
+ logger.debug("expId - {}: Publishing job status for " + jobStatus.getJobIdentity().getJobId() + ":"
+ + state.toString(),jobStatus.getJobIdentity().getExperimentId());
+ JobStatusChangeEvent event = new JobStatusChangeEvent(jobStatus.getState(), jobStatus.getJobIdentity());
+ monitorPublisher.publish(event);
+ String messageId = AiravataUtils.getId("JOB");
+ MessageContext msgCntxt = new MessageContext(event, MessageType.JOB, messageId, jobStatus.getJobIdentity().getGatewayId());
+ msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+ publisher.publish(msgCntxt);
+ } catch (Exception e) {
+ logger.error("expId - " + jobStatus.getJobIdentity().getExperimentId() + ": Error persisting data"
+ + e.getLocalizedMessage(), e);
+ throw new Exception("Error persisting job status..", e);
+ }
+ }
+ }
+
+ public void updateJobStatus(String expId, String taskId, String jobID, JobState state) throws Exception {
+ logger.info("expId - {}: Updating job status for " + jobID + ":" + state.toString(), expId);
+ CompositeIdentifier ids = new CompositeIdentifier(taskId, jobID);
+ JobDetails details = (JobDetails) airavataRegistry.get(RegistryModelType.JOB_DETAIL, ids);
+ if (details == null) {
+ details = new JobDetails();
+ }
+ org.apache.airavata.model.workspace.experiment.JobStatus status = new org.apache.airavata.model.workspace.experiment.JobStatus();
+ if (JobState.CANCELED.equals(details.getJobStatus().getJobState()) ||
+ JobState.CANCELING.equals(details.getJobStatus().getJobState())) {
+ status.setJobState(details.getJobStatus().getJobState());
+ } else {
+ status.setJobState(state);
+ }
+ status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+ details.setJobStatus(status);
+ details.setJobID(jobID);
+ logger.debug("expId - {}: Updated job status for " + jobID + ":" + details.getJobStatus().toString(), expId);
+ airavataRegistry.update(RegistryModelType.JOB_STATUS, status, ids);
+ }
+
+ @SuppressWarnings("unchecked")
+ public void setup(Object... configurations) {
+ for (Object configuration : configurations) {
+ if (configuration instanceof Registry){
+ this.airavataRegistry=(Registry)configuration;
+ } else if (configuration instanceof MonitorPublisher){
+ this.monitorPublisher=(MonitorPublisher) configuration;
+ } else if (configuration instanceof Publisher){
+ this.publisher=(Publisher) configuration;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataTaskStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataTaskStatusUpdator.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataTaskStatusUpdator.java
new file mode 100644
index 0000000..94029be
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataTaskStatusUpdator.java
@@ -0,0 +1,166 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.impl;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.listener.AbstractActivityListener;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.TaskIdentifier;
+import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeRequestEvent;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.TaskState;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Calendar;
+
+public class AiravataTaskStatusUpdator implements AbstractActivityListener {
+ private final static Logger logger = LoggerFactory.getLogger(AiravataTaskStatusUpdator.class);
+ private Registry airavataRegistry;
+ private MonitorPublisher monitorPublisher;
+ private Publisher publisher;
+
+ public Registry getAiravataRegistry() {
+ return airavataRegistry;
+ }
+
+ public void setAiravataRegistry(Registry airavataRegistry) {
+ this.airavataRegistry = airavataRegistry;
+ }
+
+ @Subscribe
+ public void setupTaskStatus(TaskStatusChangeRequestEvent taskStatus) throws Exception{
+ try {
+ updateTaskStatus(taskStatus.getTaskIdentity().getTaskId(), taskStatus.getState());
+ logger.debug("expId - {}: Publishing task status for " + taskStatus.getTaskIdentity().getTaskId() + ":"
+ + taskStatus.getState().toString(), taskStatus.getTaskIdentity().getExperimentId());
+ TaskStatusChangeEvent event = new TaskStatusChangeEvent(taskStatus.getState(), taskStatus.getTaskIdentity());
+ monitorPublisher.publish(event);
+ String messageId = AiravataUtils.getId("TASK");
+ MessageContext msgCntxt = new MessageContext(event, MessageType.TASK, messageId, taskStatus.getTaskIdentity().getGatewayId());
+ msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+ publisher.publish(msgCntxt);
+ } catch (Exception e) {
+ String msg = "Error persisting data task status to database...";
+ logger.error(msg + e.getLocalizedMessage(), e);
+ throw new Exception(msg, e);
+ }
+ }
+
+ @Subscribe
+ public void setupTaskStatus(JobStatusChangeEvent jobStatus) throws Exception{
+ TaskState state=TaskState.UNKNOWN;
+ switch(jobStatus.getState()){
+ case ACTIVE:
+ state=TaskState.EXECUTING; break;
+ case CANCELED:
+ state=TaskState.CANCELED; break;
+ case COMPLETE: case FAILED:
+ state=TaskState.POST_PROCESSING; break;
+ case HELD: case SUSPENDED: case QUEUED:
+ state=TaskState.WAITING; break;
+ case SETUP:
+ state=TaskState.PRE_PROCESSING; break;
+ case SUBMITTED:
+ state=TaskState.STARTED; break;
+ case UN_SUBMITTED:
+ state=TaskState.CANCELED; break;
+ case CANCELING:
+ state=TaskState.CANCELING; break;
+ default:
+ return;
+ }
+ try {
+ updateTaskStatus(jobStatus.getJobIdentity().getTaskId(), state);
+ logger.debug("expId - {}: Publishing task status for " + jobStatus.getJobIdentity().getTaskId() + ":"
+ + state.toString(), jobStatus.getJobIdentity().getExperimentId());
+ TaskIdentifier taskIdentity = new TaskIdentifier(jobStatus.getJobIdentity().getTaskId(),
+ jobStatus.getJobIdentity().getWorkflowNodeId(),
+ jobStatus.getJobIdentity().getExperimentId(),
+ jobStatus.getJobIdentity().getGatewayId());
+ TaskStatusChangeEvent event = new TaskStatusChangeEvent(state, taskIdentity);
+ monitorPublisher.publish(event);
+ String messageId = AiravataUtils.getId("TASK");
+ MessageContext msgCntxt = new MessageContext(event, MessageType.TASK, messageId,jobStatus.getJobIdentity().getGatewayId());
+ msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+ publisher.publish(msgCntxt);
+
+ } catch (Exception e) {
+ logger.error("expId - " + jobStatus.getJobIdentity().getExperimentId() + ": Error persisting data" + e.getLocalizedMessage(), e);
+ throw new Exception("Error persisting task status..", e);
+ }
+ }
+
+ public TaskState updateTaskStatus(String taskId, TaskState state) throws Exception {
+ TaskDetails details = (TaskDetails)airavataRegistry.get(RegistryModelType.TASK_DETAIL, taskId);
+ if(details == null) {
+ logger.error("Task details cannot be null at this point");
+ throw new Exception("Task details cannot be null at this point");
+ }
+ org.apache.airavata.model.workspace.experiment.TaskStatus status = new org.apache.airavata.model.workspace.experiment.TaskStatus();
+ if(!TaskState.CANCELED.equals(details.getTaskStatus().getExecutionState())
+ && !TaskState.CANCELING.equals(details.getTaskStatus().getExecutionState())){
+ status.setExecutionState(state);
+ }else{
+ status.setExecutionState(details.getTaskStatus().getExecutionState());
+ }
+ status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+ details.setTaskStatus(status);
+ logger.debug("Updating task status for "+taskId+":"+details.getTaskStatus().toString());
+
+ airavataRegistry.update(RegistryModelType.TASK_STATUS, status, taskId);
+ return status.getExecutionState();
+ }
+
+ public void setup(Object... configurations) {
+ for (Object configuration : configurations) {
+ if (configuration instanceof Registry){
+ this.airavataRegistry=(Registry)configuration;
+ } else if (configuration instanceof MonitorPublisher){
+ this.monitorPublisher=(MonitorPublisher) configuration;
+ } else if (configuration instanceof Publisher){
+ this.publisher=(Publisher) configuration;
+ }
+ }
+ }
+
+
+ @Subscribe
+ public void taskOutputChanged(TaskOutputChangeEvent taskOutputEvent) throws AiravataException {
+ String taskId = taskOutputEvent.getTaskIdentity().getTaskId();
+ logger.debug("Task Output changed event received for workflow node : " +
+ taskOutputEvent.getTaskIdentity().getWorkflowNodeId() + ", task : " + taskId);
+ // TODO - do we need to update the output to the registry? , we do it in the workflowInterpreter too.
+ MessageContext messageContext = new MessageContext(taskOutputEvent, MessageType.TASKOUTPUT, taskOutputEvent.getTaskIdentity().getTaskId(), taskOutputEvent.getTaskIdentity().getGatewayId());
+ messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+ publisher.publish(messageContext);
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataWorkflowNodeStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataWorkflowNodeStatusUpdator.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataWorkflowNodeStatusUpdator.java
new file mode 100644
index 0000000..092774b
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataWorkflowNodeStatusUpdator.java
@@ -0,0 +1,129 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.impl;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.listener.AbstractActivityListener;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.WorkflowIdentifier;
+import org.apache.airavata.model.messaging.event.WorkflowNodeStatusChangeEvent;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeStatus;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Calendar;
+
+public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListener {
+ private final static Logger logger = LoggerFactory.getLogger(AiravataWorkflowNodeStatusUpdator.class);
+
+ private Registry airavataRegistry;
+ private MonitorPublisher monitorPublisher;
+ private Publisher publisher;
+
+
+
+
+ public Registry getAiravataRegistry() {
+ return airavataRegistry;
+ }
+
+ public void setAiravataRegistry(Registry airavataRegistry) {
+ this.airavataRegistry = airavataRegistry;
+ }
+
+ @Subscribe
+ public void setupWorkflowNodeStatus(TaskStatusChangeEvent taskStatus) throws Exception{
+ WorkflowNodeState state=WorkflowNodeState.UNKNOWN;
+ switch(taskStatus.getState()){
+ case CANCELED:
+ state=WorkflowNodeState.CANCELED; break;
+ case COMPLETED:
+ state=WorkflowNodeState.COMPLETED; break;
+ case CONFIGURING_WORKSPACE:
+ state=WorkflowNodeState.INVOKED; break;
+ case FAILED:
+ state=WorkflowNodeState.FAILED; break;
+ case EXECUTING: case WAITING: case PRE_PROCESSING: case POST_PROCESSING: case OUTPUT_DATA_STAGING: case INPUT_DATA_STAGING:
+ state=WorkflowNodeState.EXECUTING; break;
+ case STARTED:
+ state=WorkflowNodeState.INVOKED; break;
+ case CANCELING:
+ state=WorkflowNodeState.CANCELING; break;
+ default:
+ return;
+ }
+ try {
+ String expId = taskStatus.getTaskIdentity().getExperimentId();
+ updateWorkflowNodeStatus(expId, taskStatus.getTaskIdentity().getWorkflowNodeId(), state);
+ logger.debug("expId - {}: Publishing workflow node status for " + taskStatus.getTaskIdentity().getWorkflowNodeId()
+ + ":" + state.toString(), taskStatus.getTaskIdentity().getExperimentId());
+ WorkflowIdentifier workflowIdentity = new WorkflowIdentifier(taskStatus.getTaskIdentity().getWorkflowNodeId(),
+ taskStatus.getTaskIdentity().getExperimentId(),
+ taskStatus.getTaskIdentity().getGatewayId());
+ WorkflowNodeStatusChangeEvent event = new WorkflowNodeStatusChangeEvent(state, workflowIdentity);
+ monitorPublisher.publish(event);
+ String messageId = AiravataUtils.getId("WFNODE");
+ MessageContext msgCntxt = new MessageContext(event, MessageType.WORKFLOWNODE, messageId, taskStatus.getTaskIdentity().getGatewayId());
+ msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+
+ publisher.publish(msgCntxt);
+ } catch (Exception e) {
+ logger.error("expId - " + taskStatus.getTaskIdentity().getExperimentId() + ": Error persisting data"
+ + e.getLocalizedMessage(), e);
+ throw new Exception("Error persisting workflow node status..", e);
+ }
+ }
+
+ public void updateWorkflowNodeStatus(String experimentId, String workflowNodeId, WorkflowNodeState state) throws Exception {
+ logger.info("expId - {}: Updating workflow node status for "+workflowNodeId+":"+state.toString(), experimentId);
+ WorkflowNodeDetails details = (WorkflowNodeDetails)airavataRegistry.get(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeId);
+ if(details == null) {
+ details = new WorkflowNodeDetails();
+ details.setNodeInstanceId(workflowNodeId);
+ }
+ WorkflowNodeStatus status = new WorkflowNodeStatus();
+ status.setWorkflowNodeState(state);
+ status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+ details.setWorkflowNodeStatus(status);
+ airavataRegistry.update(RegistryModelType.WORKFLOW_NODE_STATUS, status, workflowNodeId);
+ }
+
+ public void setup(Object... configurations) {
+ for (Object configuration : configurations) {
+ if (configuration instanceof Registry){
+ this.airavataRegistry=(Registry)configuration;
+ } else if (configuration instanceof MonitorPublisher){
+ this.monitorPublisher=(MonitorPublisher) configuration;
+ } else if (configuration instanceof Publisher){
+ this.publisher=(Publisher) configuration;
+ }
+ }
+ }
+}