You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2015/06/18 17:24:47 UTC
airavata git commit: ssh job submission task impl
Repository: airavata
Updated Branches:
refs/heads/master ac3be7ae4 -> 2d9ea1f4a
ssh job submission task impl
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/2d9ea1f4
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/2d9ea1f4
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/2d9ea1f4
Branch: refs/heads/master
Commit: 2d9ea1f4a1394bffb2f90f847c3d25ca32946e57
Parents: ac3be7a
Author: Chathuri Wimalasena <ch...@apache.org>
Authored: Thu Jun 18 11:24:41 2015 -0400
Committer: Chathuri Wimalasena <ch...@apache.org>
Committed: Thu Jun 18 11:24:41 2015 -0400
----------------------------------------------------------------------
.../apache/airavata/common/utils/Constants.java | 5 --
.../store/server/CredentialStoreServer.java | 4 +-
.../credential/store/client/TestSSLClient.java | 4 +-
.../apache/airavata/gfac/core/GFacUtils.java | 77 +++++++++++---------
.../gfac/impl/task/SSHJobSubmissionTask.java | 71 +++++++++++++++++-
5 files changed, 116 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/2d9ea1f4/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
----------------------------------------------------------------------
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
index 83f0cc5..6e1cb84 100644
--- a/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
+++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
@@ -36,9 +36,4 @@ public final class Constants {
public static final String REMOTE_OAUTH_SERVER_URL = "remote.oauth.authorization.server";
public static final String ADMIN_USERNAME = "admin.user.name";
public static final String ADMIN_PASSWORD = "admin.password";
-
- public static final String PBS_JOB_MANAGER = "pbs";
- public static final String SLURM_JOB_MANAGER = "slurm";
- public static final String SUN_GRID_ENGINE_JOB_MANAGER = "UGE";
- public static final String LSF_JOB_MANAGER = "LSF";
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/2d9ea1f4/modules/credential-store/credential-store-service/src/main/java/org/apache/airavata/credential/store/server/CredentialStoreServer.java
----------------------------------------------------------------------
diff --git a/modules/credential-store/credential-store-service/src/main/java/org/apache/airavata/credential/store/server/CredentialStoreServer.java b/modules/credential-store/credential-store-service/src/main/java/org/apache/airavata/credential/store/server/CredentialStoreServer.java
index 9fa3adb..3439e00 100644
--- a/modules/credential-store/credential-store-service/src/main/java/org/apache/airavata/credential/store/server/CredentialStoreServer.java
+++ b/modules/credential-store/credential-store-service/src/main/java/org/apache/airavata/credential/store/server/CredentialStoreServer.java
@@ -66,8 +66,8 @@ public class CredentialStoreServer implements IServer {
new TSSLTransportFactory.TSSLTransportParameters();
String keystorePath = ServerSettings.getCredentialStoreThriftServerKeyStorePath();
String keystorePWD = ServerSettings.getCredentialStoreThriftServerKeyStorePassword();
- final int serverPort = Integer.parseInt(ServerSettings.getSetting(Constants.CREDENTIAL_SERVER_PORT, "8960"));
- final String serverHost = ServerSettings.getSetting(Constants.CREDENTIAL_SERVER_HOST, null);
+ final int serverPort = Integer.parseInt(ServerSettings.getSetting(ServerSettings.CREDENTIAL_SERVER_PORT, "8960"));
+ final String serverHost = ServerSettings.getSetting(ServerSettings.CREDENTIAL_SERVER_HOST, null);
params.setKeyStore(keystorePath, keystorePWD);
TServerSocket serverTransport = TSSLTransportFactory.getServerSocket(serverPort, 100, InetAddress.getByName(serverHost), params);
http://git-wip-us.apache.org/repos/asf/airavata/blob/2d9ea1f4/modules/credential-store/credential-store-stubs/src/main/java/org/apache/airavata/credential/store/client/TestSSLClient.java
----------------------------------------------------------------------
diff --git a/modules/credential-store/credential-store-stubs/src/main/java/org/apache/airavata/credential/store/client/TestSSLClient.java b/modules/credential-store/credential-store-stubs/src/main/java/org/apache/airavata/credential/store/client/TestSSLClient.java
index 2c93c71..fa19ea4 100644
--- a/modules/credential-store/credential-store-stubs/src/main/java/org/apache/airavata/credential/store/client/TestSSLClient.java
+++ b/modules/credential-store/credential-store-stubs/src/main/java/org/apache/airavata/credential/store/client/TestSSLClient.java
@@ -55,8 +55,8 @@ public class TestSSLClient {
String keystorePath = ServerSettings.getCredentialStoreThriftServerKeyStorePath();
String keystorePWD = ServerSettings.getCredentialStoreThriftServerKeyStorePassword();
params.setTrustStore(keystorePath, keystorePWD);
- final int serverPort = Integer.parseInt(ServerSettings.getSetting(Constants.CREDENTIAL_SERVER_PORT, "8960"));
- final String serverHost = ServerSettings.getSetting(Constants.CREDENTIAL_SERVER_HOST, null);
+ final int serverPort = Integer.parseInt(ServerSettings.getSetting(ServerSettings.CREDENTIAL_SERVER_PORT, "8960"));
+ final String serverHost = ServerSettings.getSetting(ServerSettings.CREDENTIAL_SERVER_HOST, null);
transport = TSSLTransportFactory.getClientSocket(serverHost, serverPort, 10000, params);
TProtocol protocol = new TBinaryProtocol(transport);
http://git-wip-us.apache.org/repos/asf/airavata/blob/2d9ea1f4/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
index e630570..8c08940 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
@@ -37,11 +37,17 @@ import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePrefer
import org.apache.airavata.model.application.io.DataType;
import org.apache.airavata.model.application.io.InputDataObjectType;
import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.model.commons.ErrorModel;
import org.apache.airavata.model.experiment.ExperimentModel;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.messaging.event.JobIdentifier;
+import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent;
import org.apache.airavata.model.process.ProcessModel;
import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
import org.apache.airavata.model.status.ExperimentState;
import org.apache.airavata.model.status.ExperimentStatus;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.*;
import org.apache.commons.io.FileUtils;
@@ -230,27 +236,29 @@ public class GFacUtils {
return buf.toString();
}
-// public static void saveJobStatus(JobExecutionContext jobExecutionContext,
-// JobDetails details, JobState state) throws GFacException {
-// try {
-// // first we save job details to the registry for sa and then save the job status.
-// ExperimentCatalog experimentCatalog = jobExecutionContext.getExperimentCatalog();
-// JobStatus status = new JobStatus();
-// status.setJobState(state);
-// details.setJobStatus(status);
-// experimentCatalog.add(ExpCatChildDataType.JOB_DETAIL, details,
+ public static void saveJobStatus(ProcessContext processContext,
+ JobModel jobModel, JobState state) throws GFacException {
+ try {
+ // first we save job jobModel to the registry for sa and then save the job status.
+ ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
+ JobStatus status = new JobStatus();
+ status.setJobState(state);
+ jobModel.setJobStatus(status);
+ // FIXME - Should change according to the experiment catalog impl
+// experimentCatalog.add(ExpCatChildDataType.JOB_DETAIL, jobModel,
// new CompositeIdentifier(jobExecutionContext.getTaskData()
-// .getTaskID(), details.getJobID()));
-// JobIdentifier identifier = new JobIdentifier(details.getJobID(), jobExecutionContext.getTaskData().getTaskID(),
-// jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID(),
-// jobExecutionContext.getGatewayID());
-// JobStatusChangeRequestEvent jobStatusChangeRequestEvent = new JobStatusChangeRequestEvent(state, identifier);
-// jobExecutionContext.getLocalEventPublisher().publish(jobStatusChangeRequestEvent);
-// } catch (Exception e) {
-// throw new GFacException("Error persisting job status"
-// + e.getLocalizedMessage(), e);
-// }
-// }
+// .getTaskID(), jobModel.getJobID()));
+ // FIXME - Routing keys might need to identify according to new data models
+ JobIdentifier identifier = new JobIdentifier(jobModel.getJobId(), null,
+ processContext.getProcessId(), processContext.getProcessModel().getExperimentId(),
+ processContext.getGatewayId());
+ JobStatusChangeRequestEvent jobStatusChangeRequestEvent = new JobStatusChangeRequestEvent(state, identifier);
+ processContext.getLocalEventPublisher().publish(jobStatusChangeRequestEvent);
+ } catch (Exception e) {
+ throw new GFacException("Error persisting job status"
+ + e.getLocalizedMessage(), e);
+ }
+ }
// public static void updateJobStatus(JobExecutionContext jobExecutionContext,
// JobDetails details, JobState state) throws GFacException {
@@ -270,21 +278,22 @@ public class GFacUtils {
// }
// }
-// public static void saveErrorDetails(
-// JobExecutionContext jobExecutionContext, String errorMessage)
-// throws GFacException {
-// try {
-// ExperimentCatalog experimentCatalog = jobExecutionContext.getExperimentCatalog();
-// ErrorModel details = new ErrorModel();
-// details.setActualErrorMessage(errorMessage);
-// details.setCreationTime(Calendar.getInstance().getTimeInMillis());
-// experimentCatalog.add(ExpCatChildDataType.ERROR_DETAIL, details,
+ public static void saveErrorDetails(
+ ProcessContext processContext, String errorMessage)
+ throws GFacException {
+ try {
+ ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
+ ErrorModel details = new ErrorModel();
+ details.setActualErrorMessage(errorMessage);
+ details.setCreationTime(Calendar.getInstance().getTimeInMillis());
+ // FIXME : Save error model according to new data model
+// experimentCatalog.add(ExpCatChildDataType.ERROR_DETAIL, details,
// jobExecutionContext.getTaskData().getTaskID());
-// } catch (Exception e) {
-// throw new GFacException("Error persisting job status"
-// + e.getLocalizedMessage(), e);
-// }
-// }
+ } catch (Exception e) {
+ throw new GFacException("Error persisting job status"
+ + e.getLocalizedMessage(), e);
+ }
+ }
public static Map<String, Object> getInputParamMap(List<InputDataObjectType> experimentData) throws GFacException {
Map<String, Object> map = new HashMap<String, Object>();
http://git-wip-us.apache.org/repos/asf/airavata/blob/2d9ea1f4/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
index f14502f..9fb6aae 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
@@ -23,22 +23,28 @@ package org.apache.airavata.gfac.impl.task;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.common.utils.LocalEventPublisher;
import org.apache.airavata.gfac.core.*;
import org.apache.airavata.gfac.core.cluster.RemoteCluster;
import org.apache.airavata.gfac.core.context.ProcessContext;
import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
import org.apache.airavata.gfac.core.task.JobSubmissionTask;
import org.apache.airavata.gfac.core.task.TaskException;
import org.apache.airavata.gfac.impl.Factory;
import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
import org.apache.airavata.model.status.TaskState;
import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.IOException;
import java.util.Map;
public class SSHJobSubmissionTask implements JobSubmissionTask {
@@ -84,9 +90,48 @@ public class SSHJobSubmissionTask implements JobSubmissionTask {
}
File jobFile = GFacUtils.createJobFile(jobDescriptor, jConfig);
if (jobFile != null && jobFile.exists()){
+ jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
String jobId = remoteCluster.submitBatchJob(jobFile.getPath(), processContext.getWorkingDir());
- }
+ if (jobId != null && !jobId.isEmpty()) {
+ jobModel.setJobId(jobId);
+ GFacUtils.saveJobStatus(processContext, jobModel, JobState.SUBMITTED);
+// publisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+// , GfacExperimentState.JOBSUBMITTED));
+ processContext.setJobModel(jobModel);
+ if (verifyJobSubmissionByJobId(remoteCluster, jobId)) {
+// publisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+// , GfacExperimentState.JOBSUBMITTED));
+ GFacUtils.saveJobStatus(processContext, jobModel, JobState.QUEUED);
+ }
+ } else {
+ processContext.setJobModel(jobModel);
+ int verificationTryCount = 0;
+ while (verificationTryCount++ < 3) {
+ String verifyJobId = verifyJobSubmission(remoteCluster, jobModel);
+ if (verifyJobId != null && !verifyJobId.isEmpty()) {
+ // JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED
+ jobId = verifyJobId;
+ jobModel.setJobId(jobId);
+// publisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+// , GfacExperimentState.JOBSUBMITTED));
+ GFacUtils.saveJobStatus(processContext, jobModel, JobState.QUEUED);
+ break;
+ }
+ Thread.sleep(verificationTryCount * 1000);
+ }
+ }
+ if (jobId == null || jobId.isEmpty()) {
+ String msg = "expId:" + processContext.getProcessModel().getExperimentId() + " Couldn't find remote jobId for JobName:"
+ + jobModel.getJobName() + ", both submit and verify steps doesn't return a valid JobId. Hence changing experiment state to Failed";
+ log.error(msg);
+ GFacUtils.saveErrorDetails(processContext, msg);
+ // FIXME : Need to handle according to status update chain
+// GFacUtils.publishTaskStatus(jobExecutionContext, publisher, TaskState.FAILED);
+ return TaskState.FAILED;
+ }
+ }
+ return TaskState.EXECUTING;
} catch (AppCatalogException e) {
log.error("Error while instatiating app catalog",e);
throw new TaskException("Error while instatiating app catalog", e);
@@ -99,10 +144,32 @@ public class SSHJobSubmissionTask implements JobSubmissionTask {
} catch (SSHApiException e) {
log.error("Error occurred while submitting the job", e);
throw new TaskException("Error occurred while submitting the job", e);
+ } catch (IOException e) {
+ log.error("Error while reading the content of the job file", e);
+ throw new TaskException("Error while reading the content of the job file", e);
+ } catch (InterruptedException e) {
+ log.error("Error occurred while verifying the job submission", e);
+ throw new TaskException("Error occurred while verifying the job submission", e);
}
- return null;
}
+ private boolean verifyJobSubmissionByJobId(RemoteCluster remoteCluster, String jobID) throws SSHApiException {
+ JobStatus status = remoteCluster.getJobStatus(jobID);
+ return status != null && status.getJobState() != JobState.UNKNOWN;
+ }
+
+ private String verifyJobSubmission(RemoteCluster remoteCluster, JobModel jobDetails) {
+ String jobName = jobDetails.getJobName();
+ String jobId = null;
+ try {
+ jobId = remoteCluster.getJobIdByJobName(jobName, remoteCluster.getServerInfo().getUserName());
+ } catch (SSHApiException e) {
+ log.error("Error while verifying JobId from JobName");
+ }
+ return jobId;
+ }
+
+
@Override
public TaskState recover(TaskContext taskContext) throws TaskException {
return null;