You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/05/05 17:07:15 UTC
[2/3] fixing more packaing issue
http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/gram/provider/impl/GramProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/gram/provider/impl/GramProvider.java b/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/gram/provider/impl/GramProvider.java
new file mode 100644
index 0000000..03c3fee
--- /dev/null
+++ b/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/gram/provider/impl/GramProvider.java
@@ -0,0 +1,527 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.gram.provider.impl;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.util.Map;
+import java.util.MissingResourceException;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.JobSubmissionFault;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.gram.security.GSISecurityContext;
+import org.apache.airavata.gfac.gram.util.GramJobSubmissionListener;
+import org.apache.airavata.gfac.gram.util.GramProviderUtils;
+import org.apache.airavata.gfac.notification.events.JobIDEvent;
+import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
+import org.apache.airavata.gfac.provider.AbstractProvider;
+import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.GlobusHostType;
+import org.globus.gram.GramException;
+import org.globus.gram.GramJob;
+import org.globus.gram.WaitingForCommitException;
+import org.globus.gram.internal.GRAMConstants;
+import org.globus.gram.internal.GRAMProtocolErrorConstants;
+import org.ietf.jgss.GSSCredential;
+import org.ietf.jgss.GSSException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GramProvider extends AbstractProvider {
+ private static final Logger log = LoggerFactory.getLogger(GramJobSubmissionListener.class);
+
+ private GramJob job;
+ private GramJobSubmissionListener listener;
+ private boolean twoPhase = true;
+
+ /**
+ * If normal job submission fail due to an authorisation failure or script failure we
+ * will re-attempt to submit the job. In-order to avoid any recursive loop during a continuous
+ * failure we track whether failure paths are tried or not. Following booleans keeps track whether
+ * we already tried a failure path or not.
+ */
+ /**
+ * To track job submissions during a authorisation failure while requesting job.
+ */
+ private boolean renewCredentialsAttempt = false;
+ /**
+ * To track job submission during a script error situation.
+ */
+ private boolean reSubmissionInProgress = false;
+ /**
+ * To track authorisation failures during status monitoring.
+ */
+ private boolean authorisationFailedAttempt = false;
+
+ private static final Map<String, GramJob> currentlyExecutingJobCache
+ = new ConcurrentHashMap<String, GramJob>();
+
+ private static Properties resources;
+
+ static {
+ try {
+
+ String propFileName = "errors.properties";
+ resources = new Properties();
+ InputStream inputStream = GramProvider.class.getClassLoader()
+ .getResourceAsStream(propFileName);
+
+ if (inputStream == null) {
+ throw new FileNotFoundException("property file '" + propFileName
+ + "' not found in the classpath");
+ }
+
+ resources.load(inputStream);
+
+ } catch (FileNotFoundException mre) {
+ log.error("errors.properties not found", mre);
+ } catch (IOException e) {
+ log.error("Error reading errors.properties file", e);
+ }
+ }
+
+
+ // This method prepare the environment before the application invocation.
+ public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+
+ try {
+ super.initialize(jobExecutionContext);
+ String strTwoPhase = ServerSettings.getSetting("TwoPhase");
+ if (strTwoPhase != null) {
+ twoPhase = Boolean.parseBoolean(strTwoPhase);
+ log.info("Two phase commit is set to " + twoPhase);
+ }
+ } catch (ApplicationSettingsException e) {
+ log.warn("Error reading TwoPhase property from configurations.", e);
+ }
+
+ job = GramProviderUtils.setupEnvironment(jobExecutionContext, twoPhase);
+ listener = new GramJobSubmissionListener(job, jobExecutionContext);
+ job.addListener(listener);
+ }
+
+ public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException{
+ jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
+ GlobusHostType host = (GlobusHostType) jobExecutionContext.getApplicationContext().
+ getHostDescription().getType();
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().
+ getApplicationDeploymentDescription().getType();
+
+ StringBuilder stringBuilder = new StringBuilder();
+ try {
+
+ GSSCredential gssCred = ((GSISecurityContext)jobExecutionContext.
+ getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getGssCredentials();
+ job.setCredentials(gssCred);
+ // We do not support multiple gatekeepers in XBaya GUI, so we simply pick the 0th element in the array
+ String gateKeeper = host.getGlobusGateKeeperEndPointArray(0);
+ log.info("Request to contact:" + gateKeeper);
+
+ stringBuilder.append("Finished launching job, Host = ").append(host.getHostAddress()).append(" RSL = ")
+ .append(job.getRSL()).append(" working directory = ").append(app.getStaticWorkingDirectory())
+ .append(" temp directory = ").append(app.getScratchWorkingDirectory())
+ .append(" Globus GateKeeper Endpoint = ").append(gateKeeper);
+
+ log.info(stringBuilder.toString());
+
+ submitJobs(gateKeeper, jobExecutionContext, host);
+
+ } catch (ApplicationSettingsException e) {
+ throw new GFacException(e.getMessage(), e);
+ } finally {
+ if (job != null) {
+ try {
+ /*
+ * Remove listener
+ */
+ job.removeListener(listener);
+ } catch (Exception e) {
+ log.error(e.getMessage());
+ }
+ }
+ }
+ }
+
+ private void submitJobs(String gateKeeper,
+ JobExecutionContext jobExecutionContext,
+ GlobusHostType globusHostType) throws GFacException, GFacProviderException {
+ boolean applicationSaved=false;
+ String taskID = jobExecutionContext.getTaskData().getTaskID();
+
+ if (twoPhase) {
+ try {
+ /*
+ * The first boolean is to force communication through SSLv3
+ * The second boolean is to specify the job is a batch job - use true for interactive and false for
+ * batch.
+ * The third boolean is to specify to use the full proxy and not delegate a limited proxy.
+ */
+ job.request(true, gateKeeper, false, false);
+
+ // Single boolean to track all authentication failures, therefore we need to re-initialize
+ // this here
+ renewCredentialsAttempt = false;
+
+ } catch (WaitingForCommitException e) {
+ String jobID = job.getIDAsString();
+
+ details.setJobID(jobID);
+ details.setJobDescription(job.getRSL());
+ jobExecutionContext.setJobDetails(details);
+ GFacUtils.saveJobStatus(jobExecutionContext, details, JobState.UN_SUBMITTED);
+
+ applicationSaved=true;
+ String jobStatusMessage = "Un-submitted JobID= " + jobID;
+ log.info(jobStatusMessage);
+ jobExecutionContext.getNotifier().publish(new JobIDEvent(jobStatusMessage));
+
+ log.info("Two phase commit: sending COMMIT_REQUEST signal; Job id - " + jobID);
+
+ try {
+ job.signal(GramJob.SIGNAL_COMMIT_REQUEST);
+
+ } catch (GramException gramException) {
+ throw new GFacException("Error while sending commit request. Job Id - "
+ + job.getIDAsString(), gramException);
+ } catch (GSSException gssException) {
+
+ // User credentials are invalid
+ log.error("Error while submitting commit request - Credentials provided are invalid. Job Id - "
+ + job.getIDAsString(), e);
+ log.info("Attempting to renew credentials and re-submit commit signal...");
+ GFacUtils.saveErrorDetails(jobExecutionContext, gssException.getLocalizedMessage(), CorrectiveAction.RETRY_SUBMISSION, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ renewCredentials(jobExecutionContext);
+
+ try {
+ job.signal(GramJob.SIGNAL_COMMIT_REQUEST);
+ } catch (GramException e1) {
+ GFacUtils.saveErrorDetails(jobExecutionContext, gssException.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ throw new GFacException("Error while sending commit request. Job Id - "
+ + job.getIDAsString(), e1);
+ } catch (GSSException e1) {
+ GFacUtils.saveErrorDetails(jobExecutionContext, gssException.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ throw new GFacException("Error while sending commit request. Job Id - "
+ + job.getIDAsString() + ". Credentials provided invalid", e1);
+ }
+ }
+ GFacUtils.updateJobStatus(jobExecutionContext, details, JobState.SUBMITTED);
+ jobStatusMessage = "Submitted JobID= " + job.getIDAsString();
+ log.info(jobStatusMessage);
+ jobExecutionContext.getNotifier().publish(new JobIDEvent(jobStatusMessage));
+
+ } catch (GSSException e) {
+ // Renew credentials and re-submit
+ GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(), CorrectiveAction.RETRY_SUBMISSION, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+
+ reSubmitJob(gateKeeper, jobExecutionContext, globusHostType, e);
+
+ } catch (GramException e) {
+ GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+
+ throw new GFacException("An error occurred while submitting a job, job id = " + job.getIDAsString(), e);
+ }
+ } else {
+
+ /*
+ * The first boolean is to force communication through SSLv3
+ * The second boolean is to specify the job is a batch job - use true for interactive and false for
+ * batch.
+ * The third boolean is to specify to use the full proxy and not delegate a limited proxy.
+ */
+ try {
+
+ job.request(true, gateKeeper, false, false);
+ renewCredentialsAttempt = false;
+
+ } catch (GramException e) {
+ GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ throw new GFacException("An error occurred while submitting a job, job id = " + job.getIDAsString(), e);
+ } catch (GSSException e) {
+ GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(), CorrectiveAction.RETRY_SUBMISSION, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ // Renew credentials and re-submit
+ reSubmitJob(gateKeeper, jobExecutionContext, globusHostType, e);
+ }
+
+ String jobStatusMessage = "Un-submitted JobID= " + job.getIDAsString();
+ log.info(jobStatusMessage);
+ jobExecutionContext.getNotifier().publish(new JobIDEvent(jobStatusMessage));
+
+ }
+
+ currentlyExecutingJobCache.put(job.getIDAsString(), job);
+ /*
+ * Wait until job is done
+ */
+ listener.waitFor();
+
+ checkJobStatus(jobExecutionContext, globusHostType, gateKeeper);
+
+ }
+
+ private void renewCredentials(JobExecutionContext jobExecutionContext) throws GFacException {
+
+ renewCredentials(this.job, jobExecutionContext);
+ }
+
+ private void renewCredentials(GramJob gramJob, JobExecutionContext jobExecutionContext) throws GFacException {
+
+ try {
+ GSSCredential gssCred = ((GSISecurityContext)jobExecutionContext.
+ getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).renewCredentials();
+ gramJob.renew(gssCred);
+ } catch (GramException e1) {
+ throw new GFacException("Unable to renew credentials. Job Id - "
+ + gramJob.getIDAsString(), e1);
+ } catch (GSSException e1) {
+ throw new GFacException("Unable to renew credentials. Job Id - "
+ + gramJob.getIDAsString(), e1);
+ } catch (ApplicationSettingsException e) {
+ throw new GFacException(e.getLocalizedMessage(), e);
+ }
+ }
+
+ private void reSubmitJob(String gateKeeper,
+ JobExecutionContext jobExecutionContext,
+ GlobusHostType globusHostType, Exception e) throws GFacException, GFacProviderException {
+
+ if (!renewCredentialsAttempt) {
+
+ renewCredentialsAttempt = true;
+
+ // User credentials are invalid
+ log.error("Error while submitting job - Credentials provided are invalid. Job Id - "
+ + job.getIDAsString(), e);
+ log.info("Attempting to renew credentials and re-submit jobs...");
+
+ // Remove existing listener and register a new listener
+ job.removeListener(listener);
+ listener = new GramJobSubmissionListener(job, jobExecutionContext);
+
+ job.addListener(listener);
+
+ renewCredentials(jobExecutionContext);
+
+ submitJobs(gateKeeper, jobExecutionContext, globusHostType);
+
+ } else {
+ throw new GFacException("Error while submitting job - Credentials provided are invalid. Job Id - "
+ + job.getIDAsString(), e);
+ }
+
+ }
+
+ private void reSubmitJob(String gateKeeper,
+ JobExecutionContext jobExecutionContext,
+ GlobusHostType globusHostType) throws GFacException, GFacProviderException {
+
+ // User credentials are invalid
+ log.info("Attempting to renew credentials and re-submit jobs...");
+
+ // Remove existing listener and register a new listener
+ job.removeListener(listener);
+ listener = new GramJobSubmissionListener(job, jobExecutionContext);
+
+ job.addListener(listener);
+
+ renewCredentials(jobExecutionContext);
+
+ submitJobs(gateKeeper, jobExecutionContext, globusHostType);
+
+ }
+
+
+
+ public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+ }
+
+ public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException {
+ cancelSingleJob(jobId, jobExecutionContext);
+ }
+
+
+ private void cancelSingleJob(String jobId, JobExecutionContext context) throws GFacException {
+ // First check whether job id is in the cache
+ if (currentlyExecutingJobCache.containsKey(jobId)) {
+
+ synchronized (this) {
+ GramJob gramJob = currentlyExecutingJobCache.get(jobId);
+
+ // Even though we check using containsKey, at this point job could be null
+ if (gramJob != null && (gramJob.getStatus() != GRAMConstants.STATUS_DONE ||
+ gramJob.getStatus() != GRAMConstants.STATUS_FAILED)) {
+ cancelJob(gramJob, context);
+ }
+ }
+
+ } else {
+
+ try {
+ GSSCredential gssCred = ((GSISecurityContext)context.
+ getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getGssCredentials();
+
+ GramJob gramJob = new GramJob(null);
+ try {
+ gramJob.setID(jobId);
+ } catch (MalformedURLException e) {
+ throw new GFacException("Invalid job id - " + jobId, e);
+ }
+ gramJob.setCredentials(gssCred);
+
+ synchronized (this) {
+ if (gramJob.getStatus() != GRAMConstants.STATUS_DONE ||
+ gramJob.getStatus() != GRAMConstants.STATUS_FAILED) {
+ cancelJob(gramJob, context);
+ }
+ }
+ } catch (ApplicationSettingsException e) {
+ throw new GFacException(e);
+ }
+ }
+ }
+
+ private void cancelJob(GramJob gramJob, JobExecutionContext context) throws GFacException{
+
+ try {
+ gramJob.cancel();
+ } catch (GramException e) {
+ throw new GFacException("Error cancelling job, id - " + gramJob.getIDAsString(), e);
+ } catch (GSSException e) {
+
+ log.warn("Credentials invalid to cancel job. Attempting to renew credentials and re-try. " +
+ "Job id - " + gramJob.getIDAsString());
+ renewCredentials(gramJob, context);
+
+ try {
+ gramJob.cancel();
+ gramJob.signal(GramJob.SIGNAL_COMMIT_END);
+ } catch (GramException e1) {
+ throw new GFacException("Error cancelling job, id - " + gramJob.getIDAsString(), e1);
+ } catch (GSSException e1) {
+ throw new GFacException("Error cancelling job, invalid credentials. Job id - "
+ + gramJob.getIDAsString(), e);
+ }
+ }
+
+ }
+
+ public void initProperties(Map<String, String> properties) throws GFacException {
+
+ }
+
+ private void checkJobStatus(JobExecutionContext jobExecutionContext, GlobusHostType host, String gateKeeper)
+ throws GFacProviderException {
+ int jobStatus = listener.getCurrentStatus();
+
+ if (jobStatus == GramJob.STATUS_FAILED) {
+
+ String errorMsg = "Job " + job.getIDAsString() + " on host " + host.getHostAddress() + " Job Exit Code = "
+ + listener.getError() + " Error Description = " + getGramErrorString(listener.getError());
+
+ if (listener.getError() == GRAMProtocolErrorConstants.INVALID_SCRIPT_REPLY) {
+
+ // re-submitting without renewing
+ // TODO verify why we re-submit jobs when we get a invalid script reply
+ if (!reSubmissionInProgress) {
+ reSubmissionInProgress = true;
+
+ log.info("Invalid script reply received. Re-submitting job, id - " + job.getIDAsString());
+ try {
+ reSubmitJob(gateKeeper, jobExecutionContext, host);
+ } catch (GFacException e) {
+ throw new GFacProviderException
+ ("Error during re-submission. Original job submission data - " + errorMsg, e);
+ }
+ return;
+ }
+
+ } else if (listener.getError() == GRAMProtocolErrorConstants.ERROR_AUTHORIZATION) {
+
+ // re-submit with renewed credentials
+ if (!authorisationFailedAttempt) {
+ authorisationFailedAttempt = true;
+ log.info("Authorisation error contacting provider. Re-submitting job with renewed credentials.");
+
+ try {
+ renewCredentials(jobExecutionContext);
+ reSubmitJob(gateKeeper, jobExecutionContext, host);
+ } catch (GFacException e) {
+ throw new GFacProviderException
+ ("Error during re-submission. Original job submission data - " + errorMsg, e);
+ }
+
+ return;
+ }
+
+ } else if (listener.getError() == GRAMProtocolErrorConstants.USER_CANCELLED) {
+
+ log.info("User successfully cancelled job id " + job.getIDAsString());
+ return;
+ }
+
+
+
+ log.error(errorMsg);
+
+ synchronized (this) {
+ currentlyExecutingJobCache.remove(job.getIDAsString());
+ }
+
+ throw new JobSubmissionFault(new Exception(errorMsg), host.getHostAddress(), gateKeeper,
+ job.getRSL(), jobExecutionContext, getGramErrorString(listener.getError()),
+ listener.getError());
+
+ } else if (jobStatus == GramJob.STATUS_DONE) {
+ log.info("Job " + job.getIDAsString() + " on host " + host.getHostAddress() + " is successfully executed.");
+
+ synchronized (this) {
+ currentlyExecutingJobCache.remove(job.getIDAsString());
+ }
+ }
+ }
+
+ public String getGramErrorString(int errorCode) {
+
+ if (resources != null) {
+ try {
+ return resources.getProperty(String.valueOf(errorCode));
+ } catch (MissingResourceException mre) {
+ log.warn("Error reading globus error descriptions.", mre);
+ return "Error code: " + errorCode;
+ }
+ } else {
+ return "Error code: " + errorCode;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/impl/GSISSHProvider.java
deleted file mode 100644
index 5ba5ebf..0000000
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/impl/GSISSHProvider.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.gsissh.impl;
-
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.context.JobExecutionContext;
-import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
-import org.apache.airavata.gfac.cpi.GFacImpl;
-import org.apache.airavata.gfac.gsissh.util.GFACGSISSHUtils;
-import org.apache.airavata.gfac.handler.ThreadedHandler;
-import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
-import org.apache.airavata.gfac.provider.AbstractProvider;
-import org.apache.airavata.gfac.provider.GFacProviderException;
-import org.apache.airavata.gfac.utils.GFacUtils;
-import org.apache.airavata.gsi.ssh.api.Cluster;
-import org.apache.airavata.gsi.ssh.api.SSHApiException;
-import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
-import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
-import org.apache.airavata.model.workspace.experiment.ErrorCategory;
-import org.apache.airavata.model.workspace.experiment.JobDetails;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.schemas.gfac.GsisshHostType;
-import org.apache.airavata.schemas.gfac.HostDescriptionType;
-import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-public class GSISSHProvider extends AbstractProvider {
- private static final Logger log = LoggerFactory.getLogger(GSISSHProvider.class);
-
- public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
-
- }
-
- public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
- super.initialize(jobExecutionContext);
- }
-
- public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
- log.info("Invoking GSISSH Provider Invoke ...");
- jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
- HostDescriptionType host = jobExecutionContext.getApplicationContext().
- getHostDescription().getType();
- HpcApplicationDeploymentType app = (HpcApplicationDeploymentType) jobExecutionContext.getApplicationContext().
- getApplicationDeploymentDescription().getType();
- JobDetails jobDetails = new JobDetails();
- String taskID = jobExecutionContext.getTaskData().getTaskID();
- try {
- Cluster cluster = null;
- if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) != null) {
- cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getPbsCluster();
- }
- if (cluster == null) {
- throw new GFacProviderException("Security context is not set properly");
- } else {
- log.info("Successfully retrieved the Security Context");
- }
- // This installed path is a mandetory field, because this could change based on the computing resource
- JobDescriptor jobDescriptor = GFACGSISSHUtils.createJobDescriptor(jobExecutionContext, app, cluster);
-
- log.info(jobDescriptor.toXML());
-
- jobDetails.setJobDescription(jobDescriptor.toXML());
-
- String jobID = cluster.submitBatchJob(jobDescriptor);
- jobExecutionContext.setJobDetails(jobDetails);
- if(jobID == null){
- jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
- }else{
- jobDetails.setJobID(jobID);
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED);
- }
-
-
- // Now job has submitted to the resource, its up to the Provider to parse the information to daemon handler
- // to perform monitoring, daemon handlers can be accessed from anywhere
- List<ThreadedHandler> daemonHandlers = GFacImpl.getDaemonHandlers();
- ThreadedHandler pullMonitorHandler = null;
- for(ThreadedHandler threadedHandler:daemonHandlers){
- if("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())){
- pullMonitorHandler = threadedHandler;
- }
- }
- // we know this hos is type GsiSSHHostType
- String monitorMode = ((GsisshHostType) host).getMonitorMode();
- if("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PULL.equals(monitorMode)){
- log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned: " + jobID);
- pullMonitorHandler.invoke(jobExecutionContext);
- }else{
- log.error("Currently we only support Pull monitoring");
- }
- } catch (SSHApiException e) {
- String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
- log.error(error);
- jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails,JobState.FAILED);
- GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- throw new GFacProviderException(error, e);
- } catch (Exception e) {
- String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
- log.error(error);
- jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails,JobState.FAILED);
- GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- throw new GFacProviderException(error, e);
- }
- }
-
- public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
new file mode 100644
index 0000000..abca9d9
--- /dev/null
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
@@ -0,0 +1,138 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.gsissh.provider.impl;
+
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
+import org.apache.airavata.gfac.cpi.GFacImpl;
+import org.apache.airavata.gfac.gsissh.util.GFACGSISSHUtils;
+import org.apache.airavata.gfac.handler.ThreadedHandler;
+import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
+import org.apache.airavata.gfac.provider.AbstractProvider;
+import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.model.workspace.experiment.JobDetails;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.schemas.gfac.GsisshHostType;
+import org.apache.airavata.schemas.gfac.HostDescriptionType;
+import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+public class GSISSHProvider extends AbstractProvider {
+ private static final Logger log = LoggerFactory.getLogger(GSISSHProvider.class);
+
+ public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
+
+ }
+
+ public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+ super.initialize(jobExecutionContext);
+ }
+
+ public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+ log.info("Invoking GSISSH Provider Invoke ...");
+ jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
+ HostDescriptionType host = jobExecutionContext.getApplicationContext().
+ getHostDescription().getType();
+ HpcApplicationDeploymentType app = (HpcApplicationDeploymentType) jobExecutionContext.getApplicationContext().
+ getApplicationDeploymentDescription().getType();
+ JobDetails jobDetails = new JobDetails();
+ String taskID = jobExecutionContext.getTaskData().getTaskID();
+ try {
+ Cluster cluster = null;
+ if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) != null) {
+ cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getPbsCluster();
+ }
+ if (cluster == null) {
+ throw new GFacProviderException("Security context is not set properly");
+ } else {
+ log.info("Successfully retrieved the Security Context");
+ }
+ // This installed path is a mandetory field, because this could change based on the computing resource
+ JobDescriptor jobDescriptor = GFACGSISSHUtils.createJobDescriptor(jobExecutionContext, app, cluster);
+
+ log.info(jobDescriptor.toXML());
+
+ jobDetails.setJobDescription(jobDescriptor.toXML());
+
+ String jobID = cluster.submitBatchJob(jobDescriptor);
+ jobExecutionContext.setJobDetails(jobDetails);
+ if(jobID == null){
+ jobDetails.setJobID("none");
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
+ }else{
+ jobDetails.setJobID(jobID);
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED);
+ }
+
+
+ // Now job has submitted to the resource, its up to the Provider to parse the information to daemon handler
+ // to perform monitoring, daemon handlers can be accessed from anywhere
+ List<ThreadedHandler> daemonHandlers = GFacImpl.getDaemonHandlers();
+ ThreadedHandler pullMonitorHandler = null;
+ for(ThreadedHandler threadedHandler:daemonHandlers){
+ if("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())){
+ pullMonitorHandler = threadedHandler;
+ }
+ }
+ // we know this hos is type GsiSSHHostType
+ String monitorMode = ((GsisshHostType) host).getMonitorMode();
+ if("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PULL.equals(monitorMode)){
+ log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned: " + jobID);
+ pullMonitorHandler.invoke(jobExecutionContext);
+ }else{
+ log.error("Currently we only support Pull monitoring");
+ }
+ } catch (SSHApiException e) {
+ String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
+ log.error(error);
+ jobDetails.setJobID("none");
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails,JobState.FAILED);
+ GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ throw new GFacProviderException(error, e);
+ } catch (Exception e) {
+ String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
+ log.error(error);
+ jobDetails.setJobID("none");
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails,JobState.FAILED);
+ GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ throw new GFacProviderException(error, e);
+ }
+ }
+
+ public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/impl/LocalProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/impl/LocalProvider.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/impl/LocalProvider.java
deleted file mode 100644
index f0a0bf9..0000000
--- a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/impl/LocalProvider.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.local.impl;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.airavata.gfac.Constants;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.context.JobExecutionContext;
-import org.apache.airavata.gfac.local.utils.InputStreamToFileWriter;
-import org.apache.airavata.gfac.local.utils.InputUtils;
-import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
-import org.apache.airavata.gfac.provider.AbstractProvider;
-import org.apache.airavata.gfac.provider.GFacProviderException;
-import org.apache.airavata.gfac.provider.utils.ProviderUtils;
-import org.apache.airavata.gfac.utils.GFacUtils;
-import org.apache.airavata.gfac.utils.OutputUtils;
-import org.apache.airavata.model.workspace.experiment.JobDetails;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
-import org.apache.airavata.schemas.gfac.NameValuePairType;
-import org.apache.xmlbeans.XmlException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
-
-public class LocalProvider extends AbstractProvider {
- private static final Logger log = LoggerFactory.getLogger(LocalProvider.class);
- private ProcessBuilder builder;
- private List<String> cmdList;
- private String jobId;
-
- public static class LocalProviderJobData{
- private String applicationName;
- private List<String> inputParameters;
- private String workingDir;
- private String inputDir;
- private String outputDir;
- public String getApplicationName() {
- return applicationName;
- }
- public void setApplicationName(String applicationName) {
- this.applicationName = applicationName;
- }
- public List<String> getInputParameters() {
- return inputParameters;
- }
- public void setInputParameters(List<String> inputParameters) {
- this.inputParameters = inputParameters;
- }
- public String getWorkingDir() {
- return workingDir;
- }
- public void setWorkingDir(String workingDir) {
- this.workingDir = workingDir;
- }
- public String getInputDir() {
- return inputDir;
- }
- public void setInputDir(String inputDir) {
- this.inputDir = inputDir;
- }
- public String getOutputDir() {
- return outputDir;
- }
- public void setOutputDir(String outputDir) {
- this.outputDir = outputDir;
- }
- }
- public LocalProvider(){
- cmdList = new ArrayList<String>();
- }
-
- public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException {
- super.initialize(jobExecutionContext);
- ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().
- getApplicationDeploymentDescription().getType();
-
- buildCommand(app.getExecutableLocation(), ProviderUtils.getInputParameters(jobExecutionContext));
- initProcessBuilder(app);
-
- // extra environment variables
- builder.environment().put(Constants.INPUT_DATA_DIR_VAR_NAME, app.getInputDataDirectory());
- builder.environment().put(Constants.OUTPUT_DATA_DIR_VAR_NAME, app.getOutputDataDirectory());
-
- // set working directory
- builder.directory(new File(app.getStaticWorkingDirectory()));
-
- // log info
- log.info("Command = " + InputUtils.buildCommand(cmdList));
- log.info("Working dir = " + builder.directory());
- for (String key : builder.environment().keySet()) {
- log.info("Env[" + key + "] = " + builder.environment().get(key));
- }
- }
-
- public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException {
- jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
- ApplicationDeploymentDescriptionType app = jobExecutionContext.
- getApplicationContext().getApplicationDeploymentDescription().getType();
- JobDetails jobDetails = new JobDetails();
- try {
- jobId = jobExecutionContext.getTaskData().getTaskID();
- jobDetails.setJobID(jobId);
- jobDetails.setJobDescription(app.toString());
- jobExecutionContext.setJobDetails(jobDetails);
- jobDetails.setJobDescription(app.toString());
- GFacUtils.saveJobStatus(jobExecutionContext,jobDetails, JobState.SETUP);
- // running cmd
- Process process = builder.start();
-
- Thread standardOutWriter = new InputStreamToFileWriter(process.getInputStream(), app.getStandardOutput());
- Thread standardErrorWriter = new InputStreamToFileWriter(process.getErrorStream(), app.getStandardError());
-
- // start output threads
- standardOutWriter.setDaemon(true);
- standardErrorWriter.setDaemon(true);
- standardOutWriter.start();
- standardErrorWriter.start();
-
- int returnValue = process.waitFor();
-
- // make sure other two threads are done
- standardOutWriter.join();
- standardErrorWriter.join();
-
- /*
- * check return value. usually not very helpful to draw conclusions based on return values so don't bother.
- * just provide warning in the log messages
- */
- if (returnValue != 0) {
- log.error("Process finished with non zero return value. Process may have failed");
- } else {
- log.info("Process finished with return value of zero.");
- }
-
- StringBuffer buf = new StringBuffer();
- buf.append("Executed ").append(InputUtils.buildCommand(cmdList))
- .append(" on the localHost, working directory = ").append(app.getStaticWorkingDirectory())
- .append(" tempDirectory = ").append(app.getScratchWorkingDirectory()).append(" With the status ")
- .append(String.valueOf(returnValue));
- log.info(buf.toString());
- } catch (IOException io) {
- throw new GFacProviderException(io.getMessage(), io);
- } catch (InterruptedException e) {
- throw new GFacProviderException(e.getMessage(), e);
- }catch (GFacException e) {
- throw new GFacProviderException(e.getMessage(), e);
- }
- }
-
-// private void saveApplicationJob(JobExecutionContext jobExecutionContext)
-// throws GFacProviderException {
-// ApplicationDeploymentDescriptionType app = jobExecutionContext.
-// getApplicationContext().getApplicationDeploymentDescription().getType();
-// ApplicationJob appJob = GFacUtils.createApplicationJob(jobExecutionContext);
-// appJob.setJobId(jobId);
-// LocalProviderJobData data = new LocalProviderJobData();
-// data.setApplicationName(app.getExecutableLocation());
-// data.setInputDir(app.getInputDataDirectory());
-// data.setOutputDir(app.getOutputDataDirectory());
-// data.setWorkingDir(builder.directory().toString());
-// data.setInputParameters(ProviderUtils.getInputParameters(jobExecutionContext));
-// ByteArrayOutputStream stream = new ByteArrayOutputStream();
-// JAXB.marshal(data, stream);
-// appJob.setJobData(stream.toString());
-// appJob.setSubmittedTime(Calendar.getInstance().getTime());
-// appJob.setStatus(ApplicationJobStatus.SUBMITTED);
-// appJob.setStatusUpdateTime(appJob.getSubmittedTime());
-// GFacUtils.recordApplicationJob(jobExecutionContext, appJob);
-// }
-
- public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
- ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
-
- try {
- String stdOutStr = GFacUtils.readFileToString(app.getStandardOutput());
- String stdErrStr = GFacUtils.readFileToString(app.getStandardError());
- Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters();
- OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr);
- } catch (XmlException e) {
- throw new GFacProviderException("Cannot read output:" + e.getMessage(), e);
- } catch (IOException io) {
- throw new GFacProviderException(io.getMessage(), io);
- } catch (Exception e){
- throw new GFacProviderException("Error in retrieving results",e);
- }
- }
-
- public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException {
- throw new NotImplementedException();
- }
-
-
- private void buildCommand(String executable, List<String> inputParameterList){
- cmdList.add(executable);
- cmdList.addAll(inputParameterList);
- }
-
- private void initProcessBuilder(ApplicationDeploymentDescriptionType app){
- builder = new ProcessBuilder(cmdList);
-
- NameValuePairType[] env = app.getApplicationEnvironmentArray();
-
- if(env != null && env.length > 0){
- Map<String,String> builderEnv = builder.environment();
- for (NameValuePairType entry : env) {
- builderEnv.put(entry.getName(), entry.getValue());
- }
- }
- }
-
- public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
new file mode 100644
index 0000000..e4d55b8
--- /dev/null
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
@@ -0,0 +1,239 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local.provider.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.local.utils.InputStreamToFileWriter;
+import org.apache.airavata.gfac.local.utils.InputUtils;
+import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
+import org.apache.airavata.gfac.provider.AbstractProvider;
+import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.gfac.provider.utils.ProviderUtils;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.gfac.utils.OutputUtils;
+import org.apache.airavata.model.workspace.experiment.JobDetails;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.NameValuePairType;
+import org.apache.xmlbeans.XmlException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+public class LocalProvider extends AbstractProvider {
+ private static final Logger log = LoggerFactory.getLogger(LocalProvider.class);
+ private ProcessBuilder builder;
+ private List<String> cmdList;
+ private String jobId;
+
+ public static class LocalProviderJobData{
+ private String applicationName;
+ private List<String> inputParameters;
+ private String workingDir;
+ private String inputDir;
+ private String outputDir;
+ public String getApplicationName() {
+ return applicationName;
+ }
+ public void setApplicationName(String applicationName) {
+ this.applicationName = applicationName;
+ }
+ public List<String> getInputParameters() {
+ return inputParameters;
+ }
+ public void setInputParameters(List<String> inputParameters) {
+ this.inputParameters = inputParameters;
+ }
+ public String getWorkingDir() {
+ return workingDir;
+ }
+ public void setWorkingDir(String workingDir) {
+ this.workingDir = workingDir;
+ }
+ public String getInputDir() {
+ return inputDir;
+ }
+ public void setInputDir(String inputDir) {
+ this.inputDir = inputDir;
+ }
+ public String getOutputDir() {
+ return outputDir;
+ }
+ public void setOutputDir(String outputDir) {
+ this.outputDir = outputDir;
+ }
+ }
+ public LocalProvider(){
+ cmdList = new ArrayList<String>();
+ }
+
+ public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException {
+ super.initialize(jobExecutionContext);
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().
+ getApplicationDeploymentDescription().getType();
+
+ buildCommand(app.getExecutableLocation(), ProviderUtils.getInputParameters(jobExecutionContext));
+ initProcessBuilder(app);
+
+ // extra environment variables
+ builder.environment().put(Constants.INPUT_DATA_DIR_VAR_NAME, app.getInputDataDirectory());
+ builder.environment().put(Constants.OUTPUT_DATA_DIR_VAR_NAME, app.getOutputDataDirectory());
+
+ // set working directory
+ builder.directory(new File(app.getStaticWorkingDirectory()));
+
+ // log info
+ log.info("Command = " + InputUtils.buildCommand(cmdList));
+ log.info("Working dir = " + builder.directory());
+ for (String key : builder.environment().keySet()) {
+ log.info("Env[" + key + "] = " + builder.environment().get(key));
+ }
+ }
+
+ public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+ jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.
+ getApplicationContext().getApplicationDeploymentDescription().getType();
+ JobDetails jobDetails = new JobDetails();
+ try {
+ jobId = jobExecutionContext.getTaskData().getTaskID();
+ jobDetails.setJobID(jobId);
+ jobDetails.setJobDescription(app.toString());
+ jobExecutionContext.setJobDetails(jobDetails);
+ jobDetails.setJobDescription(app.toString());
+ GFacUtils.saveJobStatus(jobExecutionContext,jobDetails, JobState.SETUP);
+ // running cmd
+ Process process = builder.start();
+
+ Thread standardOutWriter = new InputStreamToFileWriter(process.getInputStream(), app.getStandardOutput());
+ Thread standardErrorWriter = new InputStreamToFileWriter(process.getErrorStream(), app.getStandardError());
+
+ // start output threads
+ standardOutWriter.setDaemon(true);
+ standardErrorWriter.setDaemon(true);
+ standardOutWriter.start();
+ standardErrorWriter.start();
+
+ int returnValue = process.waitFor();
+
+ // make sure other two threads are done
+ standardOutWriter.join();
+ standardErrorWriter.join();
+
+ /*
+ * check return value. usually not very helpful to draw conclusions based on return values so don't bother.
+ * just provide warning in the log messages
+ */
+ if (returnValue != 0) {
+ log.error("Process finished with non zero return value. Process may have failed");
+ } else {
+ log.info("Process finished with return value of zero.");
+ }
+
+ StringBuffer buf = new StringBuffer();
+ buf.append("Executed ").append(InputUtils.buildCommand(cmdList))
+ .append(" on the localHost, working directory = ").append(app.getStaticWorkingDirectory())
+ .append(" tempDirectory = ").append(app.getScratchWorkingDirectory()).append(" With the status ")
+ .append(String.valueOf(returnValue));
+ log.info(buf.toString());
+ } catch (IOException io) {
+ throw new GFacProviderException(io.getMessage(), io);
+ } catch (InterruptedException e) {
+ throw new GFacProviderException(e.getMessage(), e);
+ }catch (GFacException e) {
+ throw new GFacProviderException(e.getMessage(), e);
+ }
+ }
+
+// private void saveApplicationJob(JobExecutionContext jobExecutionContext)
+// throws GFacProviderException {
+// ApplicationDeploymentDescriptionType app = jobExecutionContext.
+// getApplicationContext().getApplicationDeploymentDescription().getType();
+// ApplicationJob appJob = GFacUtils.createApplicationJob(jobExecutionContext);
+// appJob.setJobId(jobId);
+// LocalProviderJobData data = new LocalProviderJobData();
+// data.setApplicationName(app.getExecutableLocation());
+// data.setInputDir(app.getInputDataDirectory());
+// data.setOutputDir(app.getOutputDataDirectory());
+// data.setWorkingDir(builder.directory().toString());
+// data.setInputParameters(ProviderUtils.getInputParameters(jobExecutionContext));
+// ByteArrayOutputStream stream = new ByteArrayOutputStream();
+// JAXB.marshal(data, stream);
+// appJob.setJobData(stream.toString());
+// appJob.setSubmittedTime(Calendar.getInstance().getTime());
+// appJob.setStatus(ApplicationJobStatus.SUBMITTED);
+// appJob.setStatusUpdateTime(appJob.getSubmittedTime());
+// GFacUtils.recordApplicationJob(jobExecutionContext, appJob);
+// }
+
+ public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+
+ try {
+ String stdOutStr = GFacUtils.readFileToString(app.getStandardOutput());
+ String stdErrStr = GFacUtils.readFileToString(app.getStandardError());
+ Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters();
+ OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr);
+ } catch (XmlException e) {
+ throw new GFacProviderException("Cannot read output:" + e.getMessage(), e);
+ } catch (IOException io) {
+ throw new GFacProviderException(io.getMessage(), io);
+ } catch (Exception e){
+ throw new GFacProviderException("Error in retrieving results",e);
+ }
+ }
+
+ public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException {
+ throw new NotImplementedException();
+ }
+
+
+ private void buildCommand(String executable, List<String> inputParameterList){
+ cmdList.add(executable);
+ cmdList.addAll(inputParameterList);
+ }
+
+ private void initProcessBuilder(ApplicationDeploymentDescriptionType app){
+ builder = new ProcessBuilder(cmdList);
+
+ NameValuePairType[] env = app.getApplicationEnvironmentArray();
+
+ if(env != null && env.length > 0){
+ Map<String,String> builderEnv = builder.environment();
+ for (NameValuePairType entry : env) {
+ builderEnv.put(entry.getName(), entry.getValue());
+ }
+ }
+ }
+
+ public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java b/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
index 902e424..afb101c 100644
--- a/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
+++ b/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
@@ -27,7 +27,7 @@ import org.apache.airavata.gfac.context.ApplicationContext;
import org.apache.airavata.gfac.context.JobExecutionContext;
import org.apache.airavata.gfac.context.MessageContext;
import org.apache.airavata.gfac.local.handler.LocalDirectorySetupHandler;
-import org.apache.airavata.gfac.local.impl.LocalProvider;
+import org.apache.airavata.gfac.local.provider.impl.LocalProvider;
import org.apache.airavata.gfac.provider.GFacProviderException;
import org.apache.airavata.model.workspace.experiment.TaskDetails;
import org.apache.airavata.persistance.registry.jpa.impl.LoggingRegistryImpl;
http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/gfac/gfac-local/src/test/resources/gfac-config.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/test/resources/gfac-config.xml b/modules/gfac/gfac-local/src/test/resources/gfac-config.xml
index b9432da..8147853 100644
--- a/modules/gfac/gfac-local/src/test/resources/gfac-config.xml
+++ b/modules/gfac/gfac-local/src/test/resources/gfac-config.xml
@@ -19,7 +19,7 @@
</InHandlers>
<OutHandlers></OutHandlers>
</GlobalHandlers>
- <Provider class="org.apache.airavata.gfac.local.impl.LocalProvider" host="org.apache.airavata.schemas.gfac.impl.HostDescriptionTypeImpl">
+ <Provider class="org.apache.airavata.gfac.local.provider.impl.LocalProvider" host="org.apache.airavata.schemas.gfac.impl.HostDescriptionTypeImpl">
<InHandlers>
<Handler class="org.apache.airavata.gfac.local.handler.LocalDirectorySetupHandler"/>
</InHandlers>
http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/impl/SSHProvider.java
deleted file mode 100644
index ee3dcd2..0000000
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/impl/SSHProvider.java
+++ /dev/null
@@ -1,310 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.gfac.ssh.impl;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Calendar;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.commons.gfac.type.ActualParameter;
-import org.apache.airavata.commons.gfac.type.MappingFactory;
-import org.apache.airavata.gfac.Constants;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.context.JobExecutionContext;
-import org.apache.airavata.gfac.context.MessageContext;
-import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
-import org.apache.airavata.gfac.handler.GFacHandlerException;
-import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
-import org.apache.airavata.gfac.provider.AbstractProvider;
-import org.apache.airavata.gfac.provider.GFacProviderException;
-import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
-import org.apache.airavata.gfac.utils.GFacUtils;
-import org.apache.airavata.gsi.ssh.api.Cluster;
-import org.apache.airavata.gsi.ssh.api.CommandExecutor;
-import org.apache.airavata.gsi.ssh.api.SSHApiException;
-import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
-import org.apache.airavata.gsi.ssh.impl.RawCommandInfo;
-import org.apache.airavata.gsi.ssh.impl.StandardOutReader;
-import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
-import org.apache.airavata.model.workspace.experiment.ErrorCategory;
-import org.apache.airavata.model.workspace.experiment.JobDetails;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.schemas.gfac.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
-
-/**
- * Execute application using remote SSH
- */
-public class SSHProvider extends AbstractProvider {
- private static final Logger log = LoggerFactory.getLogger(SSHProvider.class);
- private Cluster cluster;
- private String jobID = null;
- private String taskID = null;
- // we keep gsisshprovider to support qsub submission incase of hpc scenario with ssh
- private boolean hpcType = false;
-
- public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
- super.initialize(jobExecutionContext);
- if(jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null){
- try {
- GFACSSHUtils.addSecurityContext(jobExecutionContext);
- } catch (ApplicationSettingsException e) {
- log.error(e.getMessage());
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
- }
- }
- taskID = jobExecutionContext.getTaskData().getTaskID();
- if (!((SSHHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType()).getHpcResource()) {
- jobID = "SSH_" + jobExecutionContext.getApplicationContext().getHostDescription().getType().getHostAddress() + "_" + Calendar.getInstance().getTimeInMillis();
- cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster();
-
- ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
- String remoteFile = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME;
- details.setJobID(taskID);
- details.setJobDescription(remoteFile);
- jobExecutionContext.setJobDetails(details);
- JobDescriptor jobDescriptor = GFACSSHUtils.createJobDescriptor(jobExecutionContext, app, null);
- details.setJobDescription(jobDescriptor.toXML());
-
- GFacUtils.saveJobStatus(jobExecutionContext, details, JobState.SETUP);
- log.info(remoteFile);
- try {
- File runscript = createShellScript(jobExecutionContext);
- cluster.scpTo(remoteFile, runscript.getAbsolutePath());
- } catch (Exception e) {
- throw new GFacProviderException(e.getLocalizedMessage(), e);
- }
- }else{
- hpcType = true;
- }
- }
-
-
- public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException {
- if (!hpcType) {
- ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
- try {
- /*
- * Execute
- */
- String execuable = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME;
- details.setJobDescription(execuable);
-
-// GFacUtils.updateJobStatus(details, JobState.SUBMITTED);
- RawCommandInfo rawCommandInfo = new RawCommandInfo("/bin/chmod 755 " + execuable + "; " + execuable);
-
- StandardOutReader jobIDReaderCommandOutput = new StandardOutReader();
-
- CommandExecutor.executeCommand(rawCommandInfo, cluster.getSession(), jobIDReaderCommandOutput);
- String stdOutputString = getOutputifAvailable(jobIDReaderCommandOutput, "Error submitting job to resource");
-
- log.info("stdout=" + stdOutputString);
-
-// GFacUtils.updateJobStatus(details, JobState.COMPLETE);
- } catch (Exception e) {
- throw new GFacProviderException(e.getMessage(), e);
- } finally {
- if (cluster != null) {
- try {
- cluster.disconnect();
- } catch (SSHApiException e) {
- throw new GFacProviderException(e.getMessage(), e);
- }
- }
- }
- } else {
- try {
- jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
- HostDescriptionType host = jobExecutionContext.getApplicationContext().
- getHostDescription().getType();
- HpcApplicationDeploymentType app = (HpcApplicationDeploymentType) jobExecutionContext.getApplicationContext().
- getApplicationDeploymentDescription().getType();
- JobDetails jobDetails = new JobDetails();
- String taskID = jobExecutionContext.getTaskData().getTaskID();
- try {
- Cluster cluster = null;
- if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) != null) {
- cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster();
- }
- if (cluster == null) {
- throw new GFacProviderException("Security context is not set properly");
- } else {
- log.info("Successfully retrieved the Security Context");
- }
- // This installed path is a mandetory field, because this could change based on the computing resource
- JobDescriptor jobDescriptor = GFACSSHUtils.createJobDescriptor(jobExecutionContext, app, cluster);
-
- log.info(jobDescriptor.toXML());
-
- jobDetails.setJobDescription(jobDescriptor.toXML());
-
- String jobID = cluster.submitBatchJob(jobDescriptor);
- jobExecutionContext.setJobDetails(jobDetails);
- if (jobID == null) {
- jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
- } else {
- jobDetails.setJobID(jobID);
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED);
- }
-
- } catch (SSHApiException e) {
- String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
- log.error(error);
- jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
- GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- throw new GFacProviderException(error, e);
- } catch (Exception e) {
- String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
- log.error(error);
- jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
- GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- throw new GFacProviderException(error, e);
- }
- } catch (GFacException e) {
- throw new GFacProviderException(e.getMessage(), e);
- }
- }
- }
-
- public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
-
- }
-
-
- public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException {
- throw new NotImplementedException();
- }
-
-
- private File createShellScript(JobExecutionContext context) throws IOException {
- ApplicationDeploymentDescriptionType app = context.getApplicationContext()
- .getApplicationDeploymentDescription().getType();
- String uniqueDir = app.getApplicationName().getStringValue() + System.currentTimeMillis()
- + new Random().nextLong();
-
- File shellScript = File.createTempFile(uniqueDir, "sh");
- OutputStream out = new FileOutputStream(shellScript);
-
- out.write("#!/bin/bash\n".getBytes());
- out.write(("cd " + app.getStaticWorkingDirectory() + "\n").getBytes());
- out.write(("export " + Constants.INPUT_DATA_DIR_VAR_NAME + "=" + app.getInputDataDirectory() + "\n").getBytes());
- out.write(("export " + Constants.OUTPUT_DATA_DIR_VAR_NAME + "=" + app.getOutputDataDirectory() + "\n")
- .getBytes());
- // get the env of the host and the application
- NameValuePairType[] env = app.getApplicationEnvironmentArray();
-
- Map<String, String> nv = new HashMap<String, String>();
- if (env != null) {
- for (int i = 0; i < env.length; i++) {
- String key = env[i].getName();
- String value = env[i].getValue();
- nv.put(key, value);
- }
- }
- for (Entry<String, String> entry : nv.entrySet()) {
- log.debug("Env[" + entry.getKey() + "] = " + entry.getValue());
- out.write(("export " + entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
-
- }
-
- // prepare the command
- final String SPACE = " ";
- StringBuffer cmd = new StringBuffer();
- cmd.append(app.getExecutableLocation());
- cmd.append(SPACE);
-
- MessageContext input = context.getInMessageContext();
- ;
- Map<String, Object> inputs = input.getParameters();
- Set<String> keys = inputs.keySet();
- for (String paramName : keys) {
- ActualParameter actualParameter = (ActualParameter) inputs.get(paramName);
- if ("URIArray".equals(actualParameter.getType().getType().toString())) {
- String[] values = ((URIArrayType) actualParameter.getType()).getValueArray();
- for (String value : values) {
- cmd.append(value);
- cmd.append(SPACE);
- }
- } else {
- String paramValue = MappingFactory.toString(actualParameter);
- cmd.append(paramValue);
- cmd.append(SPACE);
- }
- }
- // We redirect the error and stdout to remote files, they will be read
- // in later
- cmd.append(SPACE);
- cmd.append("1>");
- cmd.append(SPACE);
- cmd.append(app.getStandardOutput());
- cmd.append(SPACE);
- cmd.append("2>");
- cmd.append(SPACE);
- cmd.append(app.getStandardError());
-
- String cmdStr = cmd.toString();
- log.info("Command = " + cmdStr);
- out.write((cmdStr + "\n").getBytes());
- String message = "\"execuationSuceeded\"";
- out.write(("echo " + message + "\n").getBytes());
- out.close();
-
- return shellScript;
- }
-
- public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
-
- }
- /**
- * This method will read standard output and if there's any it will be parsed
- * @param jobIDReaderCommandOutput
- * @param errorMsg
- * @return
- * @throws SSHApiException
- */
- private String getOutputifAvailable(StandardOutReader jobIDReaderCommandOutput, String errorMsg) throws SSHApiException {
- String stdOutputString = jobIDReaderCommandOutput.getStdOutputString();
- String stdErrorString = jobIDReaderCommandOutput.getStdErrorString();
-
- if(stdOutputString == null || stdOutputString.isEmpty() || (stdErrorString != null && !stdErrorString.isEmpty())){
- log.error("Standard Error output : " + stdErrorString);
- throw new SSHApiException(errorMsg + stdErrorString);
- }
- return stdOutputString;
- }
-
-
-}