You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2015/06/18 17:24:47 UTC

airavata git commit: ssh job submission task impl

Repository: airavata
Updated Branches:
  refs/heads/master ac3be7ae4 -> 2d9ea1f4a


ssh job submission task impl


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/2d9ea1f4
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/2d9ea1f4
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/2d9ea1f4

Branch: refs/heads/master
Commit: 2d9ea1f4a1394bffb2f90f847c3d25ca32946e57
Parents: ac3be7a
Author: Chathuri Wimalasena <ch...@apache.org>
Authored: Thu Jun 18 11:24:41 2015 -0400
Committer: Chathuri Wimalasena <ch...@apache.org>
Committed: Thu Jun 18 11:24:41 2015 -0400

----------------------------------------------------------------------
 .../apache/airavata/common/utils/Constants.java |  5 --
 .../store/server/CredentialStoreServer.java     |  4 +-
 .../credential/store/client/TestSSLClient.java  |  4 +-
 .../apache/airavata/gfac/core/GFacUtils.java    | 77 +++++++++++---------
 .../gfac/impl/task/SSHJobSubmissionTask.java    | 71 +++++++++++++++++-
 5 files changed, 116 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/2d9ea1f4/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
----------------------------------------------------------------------
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
index 83f0cc5..6e1cb84 100644
--- a/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
+++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
@@ -36,9 +36,4 @@ public final class Constants {
     public static final String REMOTE_OAUTH_SERVER_URL = "remote.oauth.authorization.server";
     public static final String ADMIN_USERNAME = "admin.user.name";
     public static final String ADMIN_PASSWORD = "admin.password";
-
-    public static final String PBS_JOB_MANAGER = "pbs";
-    public static final String SLURM_JOB_MANAGER = "slurm";
-    public static final String SUN_GRID_ENGINE_JOB_MANAGER = "UGE";
-    public static final String LSF_JOB_MANAGER = "LSF";
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/2d9ea1f4/modules/credential-store/credential-store-service/src/main/java/org/apache/airavata/credential/store/server/CredentialStoreServer.java
----------------------------------------------------------------------
diff --git a/modules/credential-store/credential-store-service/src/main/java/org/apache/airavata/credential/store/server/CredentialStoreServer.java b/modules/credential-store/credential-store-service/src/main/java/org/apache/airavata/credential/store/server/CredentialStoreServer.java
index 9fa3adb..3439e00 100644
--- a/modules/credential-store/credential-store-service/src/main/java/org/apache/airavata/credential/store/server/CredentialStoreServer.java
+++ b/modules/credential-store/credential-store-service/src/main/java/org/apache/airavata/credential/store/server/CredentialStoreServer.java
@@ -66,8 +66,8 @@ public class CredentialStoreServer  implements IServer {
                         new TSSLTransportFactory.TSSLTransportParameters();
                 String keystorePath = ServerSettings.getCredentialStoreThriftServerKeyStorePath();
                 String keystorePWD = ServerSettings.getCredentialStoreThriftServerKeyStorePassword();
-                final int serverPort = Integer.parseInt(ServerSettings.getSetting(Constants.CREDENTIAL_SERVER_PORT, "8960"));
-                final String serverHost = ServerSettings.getSetting(Constants.CREDENTIAL_SERVER_HOST, null);
+                final int serverPort = Integer.parseInt(ServerSettings.getSetting(ServerSettings.CREDENTIAL_SERVER_PORT, "8960"));
+                final String serverHost = ServerSettings.getSetting(ServerSettings.CREDENTIAL_SERVER_HOST, null);
                 params.setKeyStore(keystorePath, keystorePWD);
 
                 TServerSocket serverTransport = TSSLTransportFactory.getServerSocket(serverPort, 100, InetAddress.getByName(serverHost), params);

http://git-wip-us.apache.org/repos/asf/airavata/blob/2d9ea1f4/modules/credential-store/credential-store-stubs/src/main/java/org/apache/airavata/credential/store/client/TestSSLClient.java
----------------------------------------------------------------------
diff --git a/modules/credential-store/credential-store-stubs/src/main/java/org/apache/airavata/credential/store/client/TestSSLClient.java b/modules/credential-store/credential-store-stubs/src/main/java/org/apache/airavata/credential/store/client/TestSSLClient.java
index 2c93c71..fa19ea4 100644
--- a/modules/credential-store/credential-store-stubs/src/main/java/org/apache/airavata/credential/store/client/TestSSLClient.java
+++ b/modules/credential-store/credential-store-stubs/src/main/java/org/apache/airavata/credential/store/client/TestSSLClient.java
@@ -55,8 +55,8 @@ public class TestSSLClient {
             String keystorePath = ServerSettings.getCredentialStoreThriftServerKeyStorePath();
             String keystorePWD = ServerSettings.getCredentialStoreThriftServerKeyStorePassword();
             params.setTrustStore(keystorePath, keystorePWD);
-            final int serverPort = Integer.parseInt(ServerSettings.getSetting(Constants.CREDENTIAL_SERVER_PORT, "8960"));
-            final String serverHost = ServerSettings.getSetting(Constants.CREDENTIAL_SERVER_HOST, null);
+            final int serverPort = Integer.parseInt(ServerSettings.getSetting(ServerSettings.CREDENTIAL_SERVER_PORT, "8960"));
+            final String serverHost = ServerSettings.getSetting(ServerSettings.CREDENTIAL_SERVER_HOST, null);
 
             transport = TSSLTransportFactory.getClientSocket(serverHost, serverPort, 10000, params);
             TProtocol protocol = new TBinaryProtocol(transport);

http://git-wip-us.apache.org/repos/asf/airavata/blob/2d9ea1f4/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
index e630570..8c08940 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
@@ -37,11 +37,17 @@ import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePrefer
 import org.apache.airavata.model.application.io.DataType;
 import org.apache.airavata.model.application.io.InputDataObjectType;
 import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.model.commons.ErrorModel;
 import org.apache.airavata.model.experiment.ExperimentModel;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.messaging.event.JobIdentifier;
+import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent;
 import org.apache.airavata.model.process.ProcessModel;
 import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
 import org.apache.airavata.model.status.ExperimentState;
 import org.apache.airavata.model.status.ExperimentStatus;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
 import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
 import org.apache.airavata.registry.cpi.*;
 import org.apache.commons.io.FileUtils;
@@ -230,27 +236,29 @@ public class GFacUtils {
         return buf.toString();
     }
 
-//	public static void saveJobStatus(JobExecutionContext jobExecutionContext,
-//                                     JobDetails details, JobState state) throws GFacException {
-//		try {
-//            // first we save job details to the registry for sa and then save the job status.
-//            ExperimentCatalog experimentCatalog = jobExecutionContext.getExperimentCatalog();
-//            JobStatus status = new JobStatus();
-//            status.setJobState(state);
-//            details.setJobStatus(status);
-//            experimentCatalog.add(ExpCatChildDataType.JOB_DETAIL, details,
+	public static void saveJobStatus(ProcessContext processContext,
+                                     JobModel jobModel, JobState state) throws GFacException {
+		try {
+            // first we save job jobModel to the registry for sa and then save the job status.
+            ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
+            JobStatus status = new JobStatus();
+            status.setJobState(state);
+            jobModel.setJobStatus(status);
+            // FIXME - Should change according to the experiment catalog impl
+//            experimentCatalog.add(ExpCatChildDataType.JOB_DETAIL, jobModel,
 //                    new CompositeIdentifier(jobExecutionContext.getTaskData()
-//                            .getTaskID(), details.getJobID()));
-//            JobIdentifier identifier = new JobIdentifier(details.getJobID(), jobExecutionContext.getTaskData().getTaskID(),
-//                    jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID(),
-//                    jobExecutionContext.getGatewayID());
-//            JobStatusChangeRequestEvent jobStatusChangeRequestEvent = new JobStatusChangeRequestEvent(state, identifier);
-//            jobExecutionContext.getLocalEventPublisher().publish(jobStatusChangeRequestEvent);
-//        } catch (Exception e) {
-//			throw new GFacException("Error persisting job status"
-//					+ e.getLocalizedMessage(), e);
-//		}
-//	}
+//                            .getTaskID(), jobModel.getJobID()));
+            // FIXME - Routing keys might need to identify according to new data models
+            JobIdentifier identifier = new JobIdentifier(jobModel.getJobId(), null,
+                    processContext.getProcessId(), processContext.getProcessModel().getExperimentId(),
+                    processContext.getGatewayId());
+            JobStatusChangeRequestEvent jobStatusChangeRequestEvent = new JobStatusChangeRequestEvent(state, identifier);
+            processContext.getLocalEventPublisher().publish(jobStatusChangeRequestEvent);
+        } catch (Exception e) {
+			throw new GFacException("Error persisting job status"
+					+ e.getLocalizedMessage(), e);
+		}
+	}
 
 //	public static void updateJobStatus(JobExecutionContext jobExecutionContext,
 //			JobDetails details, JobState state) throws GFacException {
@@ -270,21 +278,22 @@ public class GFacUtils {
 //		}
 //	}
 
-//	public static void saveErrorDetails(
-//			JobExecutionContext jobExecutionContext, String errorMessage)
-//			throws GFacException {
-//		try {
-//			ExperimentCatalog experimentCatalog = jobExecutionContext.getExperimentCatalog();
-//			ErrorModel details = new ErrorModel();
-//			details.setActualErrorMessage(errorMessage);
-//			details.setCreationTime(Calendar.getInstance().getTimeInMillis());
-//			experimentCatalog.add(ExpCatChildDataType.ERROR_DETAIL, details,
+	public static void saveErrorDetails(
+			ProcessContext processContext, String errorMessage)
+			throws GFacException {
+		try {
+			ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
+			ErrorModel details = new ErrorModel();
+			details.setActualErrorMessage(errorMessage);
+			details.setCreationTime(Calendar.getInstance().getTimeInMillis());
+			// FIXME : Save error model according to new data model
+//            experimentCatalog.add(ExpCatChildDataType.ERROR_DETAIL, details,
 //					jobExecutionContext.getTaskData().getTaskID());
-//		} catch (Exception e) {
-//			throw new GFacException("Error persisting job status"
-//					+ e.getLocalizedMessage(), e);
-//		}
-//	}
+		} catch (Exception e) {
+			throw new GFacException("Error persisting job status"
+					+ e.getLocalizedMessage(), e);
+		}
+	}
 
     public static Map<String, Object> getInputParamMap(List<InputDataObjectType> experimentData) throws GFacException {
         Map<String, Object> map = new HashMap<String, Object>();

http://git-wip-us.apache.org/repos/asf/airavata/blob/2d9ea1f4/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
index f14502f..9fb6aae 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
@@ -23,22 +23,28 @@ package org.apache.airavata.gfac.impl.task;
 
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.common.utils.LocalEventPublisher;
 import org.apache.airavata.gfac.core.*;
 import org.apache.airavata.gfac.core.cluster.RemoteCluster;
 import org.apache.airavata.gfac.core.context.ProcessContext;
 import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
 import org.apache.airavata.gfac.core.task.JobSubmissionTask;
 import org.apache.airavata.gfac.core.task.TaskException;
 import org.apache.airavata.gfac.impl.Factory;
 import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
 import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
 import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
 import org.apache.airavata.model.status.TaskState;
 import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.Map;
 
 public class SSHJobSubmissionTask implements JobSubmissionTask {
@@ -84,9 +90,48 @@ public class SSHJobSubmissionTask implements JobSubmissionTask {
             }
             File jobFile = GFacUtils.createJobFile(jobDescriptor, jConfig);
             if (jobFile != null && jobFile.exists()){
+                jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
                 String jobId = remoteCluster.submitBatchJob(jobFile.getPath(), processContext.getWorkingDir());
-            }
+                if (jobId != null && !jobId.isEmpty()) {
+                    jobModel.setJobId(jobId);
+                    GFacUtils.saveJobStatus(processContext, jobModel, JobState.SUBMITTED);
+//                    publisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+//                            , GfacExperimentState.JOBSUBMITTED));
+                    processContext.setJobModel(jobModel);
+                    if (verifyJobSubmissionByJobId(remoteCluster, jobId)) {
+//                        publisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+//                                , GfacExperimentState.JOBSUBMITTED));
+                        GFacUtils.saveJobStatus(processContext, jobModel, JobState.QUEUED);
+                    }
+                } else {
+                    processContext.setJobModel(jobModel);
+                    int verificationTryCount = 0;
+                    while (verificationTryCount++ < 3) {
+                        String verifyJobId = verifyJobSubmission(remoteCluster, jobModel);
+                        if (verifyJobId != null && !verifyJobId.isEmpty()) {
+                            // JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED
+                            jobId = verifyJobId;
+                            jobModel.setJobId(jobId);
+//                            publisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+//                                    , GfacExperimentState.JOBSUBMITTED));
+                            GFacUtils.saveJobStatus(processContext, jobModel, JobState.QUEUED);
+                            break;
+                        }
+                        Thread.sleep(verificationTryCount * 1000);
+                    }
+                }
 
+                if (jobId == null || jobId.isEmpty()) {
+                    String msg = "expId:" + processContext.getProcessModel().getExperimentId() + " Couldn't find remote jobId for JobName:"
+                            + jobModel.getJobName() + ", both submit and verify steps doesn't return a valid JobId. Hence changing experiment state to Failed";
+                    log.error(msg);
+                    GFacUtils.saveErrorDetails(processContext, msg);
+                    // FIXME : Need to handle according to status update chain
+//                    GFacUtils.publishTaskStatus(jobExecutionContext, publisher, TaskState.FAILED);
+                    return TaskState.FAILED;
+                }
+            }
+            return TaskState.EXECUTING;
         } catch (AppCatalogException e) {
             log.error("Error while instatiating app catalog",e);
             throw new TaskException("Error while instatiating app catalog", e);
@@ -99,10 +144,32 @@ public class SSHJobSubmissionTask implements JobSubmissionTask {
         } catch (SSHApiException e) {
             log.error("Error occurred while submitting the job", e);
             throw new TaskException("Error occurred while submitting the job", e);
+        } catch (IOException e) {
+            log.error("Error while reading the content of the job file", e);
+            throw new TaskException("Error while reading the content of the job file", e);
+        } catch (InterruptedException e) {
+            log.error("Error occurred while verifying the job submission", e);
+            throw new TaskException("Error occurred while verifying the job submission", e);
         }
-        return null;
     }
 
+    private boolean verifyJobSubmissionByJobId(RemoteCluster remoteCluster, String jobID) throws SSHApiException {
+        JobStatus status = remoteCluster.getJobStatus(jobID);
+        return status != null &&  status.getJobState() != JobState.UNKNOWN;
+    }
+
+    private String verifyJobSubmission(RemoteCluster remoteCluster, JobModel jobDetails) {
+        String jobName = jobDetails.getJobName();
+        String jobId = null;
+        try {
+            jobId  = remoteCluster.getJobIdByJobName(jobName, remoteCluster.getServerInfo().getUserName());
+        } catch (SSHApiException e) {
+            log.error("Error while verifying JobId from JobName");
+        }
+        return jobId;
+    }
+
+
     @Override
     public TaskState recover(TaskContext taskContext) throws TaskException {
         return null;